1# Copyright (C) 2006-2012 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17import bz2
18import os
19import re
20import sys
21import zlib
22
23from .. import (
24    bencode,
25    branch,
26    bzr as _mod_bzr,
27    config as _mod_config,
28    controldir,
29    debug,
30    errors,
31    gpg,
32    graph,
33    lock,
34    lockdir,
35    osutils,
36    registry,
37    repository as _mod_repository,
38    revision as _mod_revision,
39    urlutils,
40    )
41from . import (
42    branch as bzrbranch,
43    bzrdir as _mod_bzrdir,
44    inventory_delta,
45    testament as _mod_testament,
46    vf_repository,
47    vf_search,
48    )
49from .branch import BranchReferenceFormat
50from ..branch import BranchWriteLockResult
51from ..decorators import only_raises
52from ..errors import (
53    NoSuchRevision,
54    SmartProtocolError,
55    )
56from ..i18n import gettext
57from .inventory import Inventory
58from .inventorytree import InventoryRevisionTree
59from ..lockable_files import LockableFiles
60from .smart import client, vfs, repository as smart_repo
61from .smart.client import _SmartClient
62from ..revision import NULL_REVISION
63from ..repository import RepositoryWriteLockResult, _LazyListJoin
64from .serializer import format_registry as serializer_format_registry
65from ..trace import mutter, note, warning, log_exception_quietly
66from .versionedfile import FulltextContentFactory
67
68
69_DEFAULT_SEARCH_DEPTH = 100
70
71
72class _RpcHelper(object):
73    """Mixin class that helps with issuing RPCs."""
74
75    def _call(self, method, *args, **err_context):
76        try:
77            return self._client.call(method, *args)
78        except errors.ErrorFromSmartServer as err:
79            self._translate_error(err, **err_context)
80
81    def _call_expecting_body(self, method, *args, **err_context):
82        try:
83            return self._client.call_expecting_body(method, *args)
84        except errors.ErrorFromSmartServer as err:
85            self._translate_error(err, **err_context)
86
87    def _call_with_body_bytes(self, method, args, body_bytes, **err_context):
88        try:
89            return self._client.call_with_body_bytes(method, args, body_bytes)
90        except errors.ErrorFromSmartServer as err:
91            self._translate_error(err, **err_context)
92
93    def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
94                                             **err_context):
95        try:
96            return self._client.call_with_body_bytes_expecting_body(
97                method, args, body_bytes)
98        except errors.ErrorFromSmartServer as err:
99            self._translate_error(err, **err_context)
100
101
102def response_tuple_to_repo_format(response):
103    """Convert a response tuple describing a repository format to a format."""
104    format = RemoteRepositoryFormat()
105    format._rich_root_data = (response[0] == b'yes')
106    format._supports_tree_reference = (response[1] == b'yes')
107    format._supports_external_lookups = (response[2] == b'yes')
108    format._network_name = response[3]
109    return format
110
111
112# Note that RemoteBzrDirProber lives in breezy.bzrdir so breezy.bzr.remote
113# does not have to be imported unless a remote format is involved.
114
115class RemoteBzrDirFormat(_mod_bzrdir.BzrDirMetaFormat1):
116    """Format representing bzrdirs accessed via a smart server"""
117
118    supports_workingtrees = False
119
120    colocated_branches = False
121
122    def __init__(self):
123        _mod_bzrdir.BzrDirMetaFormat1.__init__(self)
124        # XXX: It's a bit ugly that the network name is here, because we'd
125        # like to believe that format objects are stateless or at least
126        # immutable,  However, we do at least avoid mutating the name after
127        # it's returned.  See <https://bugs.launchpad.net/bzr/+bug/504102>
128        self._network_name = None
129
130    def __repr__(self):
131        return "%s(_network_name=%r)" % (self.__class__.__name__,
132                                         self._network_name)
133
134    def get_format_description(self):
135        if self._network_name:
136            try:
137                real_format = controldir.network_format_registry.get(
138                    self._network_name)
139            except KeyError:
140                pass
141            else:
142                return 'Remote: ' + real_format.get_format_description()
143        return 'bzr remote bzrdir'
144
145    def get_format_string(self):
146        raise NotImplementedError(self.get_format_string)
147
148    def network_name(self):
149        if self._network_name:
150            return self._network_name
151        else:
152            raise AssertionError("No network name set.")
153
154    def initialize_on_transport(self, transport):
155        try:
156            # hand off the request to the smart server
157            client_medium = transport.get_smart_medium()
158        except errors.NoSmartMedium:
159            # TODO: lookup the local format from a server hint.
160            local_dir_format = _mod_bzrdir.BzrDirMetaFormat1()
161            return local_dir_format.initialize_on_transport(transport)
162        client = _SmartClient(client_medium)
163        path = client.remote_path_from_transport(transport)
164        try:
165            response = client.call(b'BzrDirFormat.initialize', path)
166        except errors.ErrorFromSmartServer as err:
167            _translate_error(err, path=path)
168        if response[0] != b'ok':
169            raise errors.SmartProtocolError(
170                'unexpected response code %s' % (response,))
171        format = RemoteBzrDirFormat()
172        self._supply_sub_formats_to(format)
173        return RemoteBzrDir(transport, format)
174
175    def parse_NoneTrueFalse(self, arg):
176        if not arg:
177            return None
178        if arg == b'False':
179            return False
180        if arg == b'True':
181            return True
182        raise AssertionError("invalid arg %r" % arg)
183
184    def _serialize_NoneTrueFalse(self, arg):
185        if arg is False:
186            return b'False'
187        if arg:
188            return b'True'
189        return b''
190
191    def _serialize_NoneString(self, arg):
192        return arg or b''
193
194    def initialize_on_transport_ex(self, transport, use_existing_dir=False,
195                                   create_prefix=False, force_new_repo=False, stacked_on=None,
196                                   stack_on_pwd=None, repo_format_name=None, make_working_trees=None,
197                                   shared_repo=False):
198        try:
199            # hand off the request to the smart server
200            client_medium = transport.get_smart_medium()
201        except errors.NoSmartMedium:
202            do_vfs = True
203        else:
204            # Decline to open it if the server doesn't support our required
205            # version (3) so that the VFS-based transport will do it.
206            if client_medium.should_probe():
207                try:
208                    server_version = client_medium.protocol_version()
209                    if server_version != '2':
210                        do_vfs = True
211                    else:
212                        do_vfs = False
213                except errors.SmartProtocolError:
214                    # Apparently there's no usable smart server there, even though
215                    # the medium supports the smart protocol.
216                    do_vfs = True
217            else:
218                do_vfs = False
219        if not do_vfs:
220            client = _SmartClient(client_medium)
221            path = client.remote_path_from_transport(transport)
222            if client_medium._is_remote_before((1, 16)):
223                do_vfs = True
224        if do_vfs:
225            # TODO: lookup the local format from a server hint.
226            local_dir_format = _mod_bzrdir.BzrDirMetaFormat1()
227            self._supply_sub_formats_to(local_dir_format)
228            return local_dir_format.initialize_on_transport_ex(transport,
229                                                               use_existing_dir=use_existing_dir, create_prefix=create_prefix,
230                                                               force_new_repo=force_new_repo, stacked_on=stacked_on,
231                                                               stack_on_pwd=stack_on_pwd, repo_format_name=repo_format_name,
232                                                               make_working_trees=make_working_trees, shared_repo=shared_repo,
233                                                               vfs_only=True)
234        return self._initialize_on_transport_ex_rpc(client, path, transport,
235                                                    use_existing_dir, create_prefix, force_new_repo, stacked_on,
236                                                    stack_on_pwd, repo_format_name, make_working_trees, shared_repo)
237
238    def _initialize_on_transport_ex_rpc(self, client, path, transport,
239                                        use_existing_dir, create_prefix, force_new_repo, stacked_on,
240                                        stack_on_pwd, repo_format_name, make_working_trees, shared_repo):
241        args = []
242        args.append(self._serialize_NoneTrueFalse(use_existing_dir))
243        args.append(self._serialize_NoneTrueFalse(create_prefix))
244        args.append(self._serialize_NoneTrueFalse(force_new_repo))
245        args.append(self._serialize_NoneString(stacked_on))
246        # stack_on_pwd is often/usually our transport
247        if stack_on_pwd:
248            try:
249                stack_on_pwd = transport.relpath(stack_on_pwd).encode('utf-8')
250                if not stack_on_pwd:
251                    stack_on_pwd = b'.'
252            except errors.PathNotChild:
253                pass
254        args.append(self._serialize_NoneString(stack_on_pwd))
255        args.append(self._serialize_NoneString(repo_format_name))
256        args.append(self._serialize_NoneTrueFalse(make_working_trees))
257        args.append(self._serialize_NoneTrueFalse(shared_repo))
258        request_network_name = self._network_name or \
259            _mod_bzrdir.BzrDirFormat.get_default_format().network_name()
260        try:
261            response = client.call(b'BzrDirFormat.initialize_ex_1.16',
262                                   request_network_name, path, *args)
263        except errors.UnknownSmartMethod:
264            client._medium._remember_remote_is_before((1, 16))
265            local_dir_format = _mod_bzrdir.BzrDirMetaFormat1()
266            self._supply_sub_formats_to(local_dir_format)
267            return local_dir_format.initialize_on_transport_ex(transport,
268                                                               use_existing_dir=use_existing_dir, create_prefix=create_prefix,
269                                                               force_new_repo=force_new_repo, stacked_on=stacked_on,
270                                                               stack_on_pwd=stack_on_pwd, repo_format_name=repo_format_name,
271                                                               make_working_trees=make_working_trees, shared_repo=shared_repo,
272                                                               vfs_only=True)
273        except errors.ErrorFromSmartServer as err:
274            _translate_error(err, path=path.decode('utf-8'))
275        repo_path = response[0]
276        bzrdir_name = response[6]
277        require_stacking = response[7]
278        require_stacking = self.parse_NoneTrueFalse(require_stacking)
279        format = RemoteBzrDirFormat()
280        format._network_name = bzrdir_name
281        self._supply_sub_formats_to(format)
282        bzrdir = RemoteBzrDir(transport, format, _client=client)
283        if repo_path:
284            repo_format = response_tuple_to_repo_format(response[1:])
285            if repo_path == b'.':
286                repo_path = b''
287            repo_path = repo_path.decode('utf-8')
288            if repo_path:
289                repo_bzrdir_format = RemoteBzrDirFormat()
290                repo_bzrdir_format._network_name = response[5]
291                repo_bzr = RemoteBzrDir(transport.clone(repo_path),
292                                        repo_bzrdir_format)
293            else:
294                repo_bzr = bzrdir
295            final_stack = response[8] or None
296            if final_stack:
297                final_stack = final_stack.decode('utf-8')
298            final_stack_pwd = response[9] or None
299            if final_stack_pwd:
300                final_stack_pwd = urlutils.join(
301                    transport.base, final_stack_pwd.decode('utf-8'))
302            remote_repo = RemoteRepository(repo_bzr, repo_format)
303            if len(response) > 10:
304                # Updated server verb that locks remotely.
305                repo_lock_token = response[10] or None
306                remote_repo.lock_write(repo_lock_token, _skip_rpc=True)
307                if repo_lock_token:
308                    remote_repo.dont_leave_lock_in_place()
309            else:
310                remote_repo.lock_write()
311            policy = _mod_bzrdir.UseExistingRepository(remote_repo,
312                                                       final_stack, final_stack_pwd, require_stacking)
313            policy.acquire_repository()
314        else:
315            remote_repo = None
316            policy = None
317        bzrdir._format.set_branch_format(self.get_branch_format())
318        if require_stacking:
319            # The repo has already been created, but we need to make sure that
320            # we'll make a stackable branch.
321            bzrdir._format.require_stacking(_skip_repo=True)
322        return remote_repo, bzrdir, require_stacking, policy
323
324    def _open(self, transport):
325        return RemoteBzrDir(transport, self)
326
327    def __eq__(self, other):
328        if not isinstance(other, RemoteBzrDirFormat):
329            return False
330        return self.get_format_description() == other.get_format_description()
331
332    def __return_repository_format(self):
333        # Always return a RemoteRepositoryFormat object, but if a specific bzr
334        # repository format has been asked for, tell the RemoteRepositoryFormat
335        # that it should use that for init() etc.
336        result = RemoteRepositoryFormat()
337        custom_format = getattr(self, '_repository_format', None)
338        if custom_format:
339            if isinstance(custom_format, RemoteRepositoryFormat):
340                return custom_format
341            else:
342                # We will use the custom format to create repositories over the
343                # wire; expose its details like rich_root_data for code to
344                # query
345                result._custom_format = custom_format
346        return result
347
348    def get_branch_format(self):
349        result = _mod_bzrdir.BzrDirMetaFormat1.get_branch_format(self)
350        if not isinstance(result, RemoteBranchFormat):
351            new_result = RemoteBranchFormat()
352            new_result._custom_format = result
353            # cache the result
354            self.set_branch_format(new_result)
355            result = new_result
356        return result
357
358    repository_format = property(__return_repository_format,
359                                 _mod_bzrdir.BzrDirMetaFormat1._set_repository_format)  # .im_func)
360
361
362class RemoteControlStore(_mod_config.IniFileStore):
363    """Control store which attempts to use HPSS calls to retrieve control store.
364
365    Note that this is specific to bzr-based formats.
366    """
367
368    def __init__(self, bzrdir):
369        super(RemoteControlStore, self).__init__()
370        self.controldir = bzrdir
371        self._real_store = None
372
373    def lock_write(self, token=None):
374        self._ensure_real()
375        return self._real_store.lock_write(token)
376
377    def unlock(self):
378        self._ensure_real()
379        return self._real_store.unlock()
380
381    def save(self):
382        with self.lock_write():
383            # We need to be able to override the undecorated implementation
384            self.save_without_locking()
385
386    def save_without_locking(self):
387        super(RemoteControlStore, self).save()
388
389    def _ensure_real(self):
390        self.controldir._ensure_real()
391        if self._real_store is None:
392            self._real_store = _mod_config.ControlStore(self.controldir)
393
394    def external_url(self):
395        return urlutils.join(self.branch.user_url, 'control.conf')
396
397    def _load_content(self):
398        medium = self.controldir._client._medium
399        path = self.controldir._path_for_remote_call(self.controldir._client)
400        try:
401            response, handler = self.controldir._call_expecting_body(
402                b'BzrDir.get_config_file', path)
403        except errors.UnknownSmartMethod:
404            self._ensure_real()
405            return self._real_store._load_content()
406        if len(response) and response[0] != b'ok':
407            raise errors.UnexpectedSmartServerResponse(response)
408        return handler.read_body_bytes()
409
410    def _save_content(self, content):
411        # FIXME JRV 2011-11-22: Ideally this should use a
412        # HPSS call too, but at the moment it is not possible
413        # to write lock control directories.
414        self._ensure_real()
415        return self._real_store._save_content(content)
416
417
418class RemoteBzrDir(_mod_bzrdir.BzrDir, _RpcHelper):
419    """Control directory on a remote server, accessed via bzr:// or similar."""
420
421    def __init__(self, transport, format, _client=None, _force_probe=False):
422        """Construct a RemoteBzrDir.
423
424        :param _client: Private parameter for testing. Disables probing and the
425            use of a real bzrdir.
426        """
427        _mod_bzrdir.BzrDir.__init__(self, transport, format)
428        # this object holds a delegated bzrdir that uses file-level operations
429        # to talk to the other side
430        self._real_bzrdir = None
431        self._has_working_tree = None
432        # 1-shot cache for the call pattern 'create_branch; open_branch' - see
433        # create_branch for details.
434        self._next_open_branch_result = None
435
436        if _client is None:
437            medium = transport.get_smart_medium()
438            self._client = client._SmartClient(medium)
439        else:
440            self._client = _client
441            if not _force_probe:
442                return
443
444        self._probe_bzrdir()
445
446    def __repr__(self):
447        return '%s(%r)' % (self.__class__.__name__, self._client)
448
449    def _probe_bzrdir(self):
450        medium = self._client._medium
451        path = self._path_for_remote_call(self._client)
452        if medium._is_remote_before((2, 1)):
453            self._rpc_open(path)
454            return
455        try:
456            self._rpc_open_2_1(path)
457            return
458        except errors.UnknownSmartMethod:
459            medium._remember_remote_is_before((2, 1))
460            self._rpc_open(path)
461
462    def _rpc_open_2_1(self, path):
463        response = self._call(b'BzrDir.open_2.1', path)
464        if response == (b'no',):
465            raise errors.NotBranchError(path=self.root_transport.base)
466        elif response[0] == b'yes':
467            if response[1] == b'yes':
468                self._has_working_tree = True
469            elif response[1] == b'no':
470                self._has_working_tree = False
471            else:
472                raise errors.UnexpectedSmartServerResponse(response)
473        else:
474            raise errors.UnexpectedSmartServerResponse(response)
475
476    def _rpc_open(self, path):
477        response = self._call(b'BzrDir.open', path)
478        if response not in [(b'yes',), (b'no',)]:
479            raise errors.UnexpectedSmartServerResponse(response)
480        if response == (b'no',):
481            raise errors.NotBranchError(path=self.root_transport.base)
482
483    def _ensure_real(self):
484        """Ensure that there is a _real_bzrdir set.
485
486        Used before calls to self._real_bzrdir.
487        """
488        if not self._real_bzrdir:
489            if 'hpssvfs' in debug.debug_flags:
490                import traceback
491                warning('VFS BzrDir access triggered\n%s',
492                        ''.join(traceback.format_stack()))
493            self._real_bzrdir = _mod_bzrdir.BzrDir.open_from_transport(
494                self.root_transport, probers=[_mod_bzr.BzrProber])
495            self._format._network_name = \
496                self._real_bzrdir._format.network_name()
497
498    def _translate_error(self, err, **context):
499        _translate_error(err, bzrdir=self, **context)
500
501    def break_lock(self):
502        # Prevent aliasing problems in the next_open_branch_result cache.
503        # See create_branch for rationale.
504        self._next_open_branch_result = None
505        return _mod_bzrdir.BzrDir.break_lock(self)
506
507    def _vfs_checkout_metadir(self):
508        self._ensure_real()
509        return self._real_bzrdir.checkout_metadir()
510
511    def checkout_metadir(self):
512        """Retrieve the controldir format to use for checkouts of this one.
513        """
514        medium = self._client._medium
515        if medium._is_remote_before((2, 5)):
516            return self._vfs_checkout_metadir()
517        path = self._path_for_remote_call(self._client)
518        try:
519            response = self._client.call(b'BzrDir.checkout_metadir',
520                                         path)
521        except errors.UnknownSmartMethod:
522            medium._remember_remote_is_before((2, 5))
523            return self._vfs_checkout_metadir()
524        if len(response) != 3:
525            raise errors.UnexpectedSmartServerResponse(response)
526        control_name, repo_name, branch_name = response
527        try:
528            format = controldir.network_format_registry.get(control_name)
529        except KeyError:
530            raise errors.UnknownFormatError(kind='control',
531                                            format=control_name)
532        if repo_name:
533            try:
534                repo_format = _mod_repository.network_format_registry.get(
535                    repo_name)
536            except KeyError:
537                raise errors.UnknownFormatError(kind='repository',
538                                                format=repo_name)
539            format.repository_format = repo_format
540        if branch_name:
541            try:
542                format.set_branch_format(
543                    branch.network_format_registry.get(branch_name))
544            except KeyError:
545                raise errors.UnknownFormatError(kind='branch',
546                                                format=branch_name)
547        return format
548
549    def _vfs_cloning_metadir(self, require_stacking=False):
550        self._ensure_real()
551        return self._real_bzrdir.cloning_metadir(
552            require_stacking=require_stacking)
553
554    def cloning_metadir(self, require_stacking=False):
555        medium = self._client._medium
556        if medium._is_remote_before((1, 13)):
557            return self._vfs_cloning_metadir(require_stacking=require_stacking)
558        verb = b'BzrDir.cloning_metadir'
559        if require_stacking:
560            stacking = b'True'
561        else:
562            stacking = b'False'
563        path = self._path_for_remote_call(self._client)
564        try:
565            response = self._call(verb, path, stacking)
566        except errors.UnknownSmartMethod:
567            medium._remember_remote_is_before((1, 13))
568            return self._vfs_cloning_metadir(require_stacking=require_stacking)
569        except errors.UnknownErrorFromSmartServer as err:
570            if err.error_tuple != (b'BranchReference',):
571                raise
572            # We need to resolve the branch reference to determine the
573            # cloning_metadir.  This causes unnecessary RPCs to open the
574            # referenced branch (and bzrdir, etc) but only when the caller
575            # didn't already resolve the branch reference.
576            referenced_branch = self.open_branch()
577            return referenced_branch.controldir.cloning_metadir()
578        if len(response) != 3:
579            raise errors.UnexpectedSmartServerResponse(response)
580        control_name, repo_name, branch_info = response
581        if len(branch_info) != 2:
582            raise errors.UnexpectedSmartServerResponse(response)
583        branch_ref, branch_name = branch_info
584        try:
585            format = controldir.network_format_registry.get(control_name)
586        except KeyError:
587            raise errors.UnknownFormatError(
588                kind='control', format=control_name)
589
590        if repo_name:
591            try:
592                format.repository_format = _mod_repository.network_format_registry.get(
593                    repo_name)
594            except KeyError:
595                raise errors.UnknownFormatError(kind='repository',
596                                                format=repo_name)
597        if branch_ref == b'ref':
598            # XXX: we need possible_transports here to avoid reopening the
599            # connection to the referenced location
600            ref_bzrdir = _mod_bzrdir.BzrDir.open(branch_name)
601            branch_format = ref_bzrdir.cloning_metadir().get_branch_format()
602            format.set_branch_format(branch_format)
603        elif branch_ref == b'branch':
604            if branch_name:
605                try:
606                    branch_format = branch.network_format_registry.get(
607                        branch_name)
608                except KeyError:
609                    raise errors.UnknownFormatError(kind='branch',
610                                                    format=branch_name)
611                format.set_branch_format(branch_format)
612        else:
613            raise errors.UnexpectedSmartServerResponse(response)
614        return format
615
616    def create_repository(self, shared=False):
617        # as per meta1 formats - just delegate to the format object which may
618        # be parameterised.
619        result = self._format.repository_format.initialize(self, shared)
620        if not isinstance(result, RemoteRepository):
621            return self.open_repository()
622        else:
623            return result
624
625    def destroy_repository(self):
626        """See BzrDir.destroy_repository"""
627        path = self._path_for_remote_call(self._client)
628        try:
629            response = self._call(b'BzrDir.destroy_repository', path)
630        except errors.UnknownSmartMethod:
631            self._ensure_real()
632            self._real_bzrdir.destroy_repository()
633            return
634        if response[0] != b'ok':
635            raise SmartProtocolError(
636                'unexpected response code %s' % (response,))
637
638    def create_branch(self, name=None, repository=None,
639                      append_revisions_only=None):
640        if name is None:
641            name = self._get_selected_branch()
642        if name != "":
643            raise errors.NoColocatedBranchSupport(self)
644        # as per meta1 formats - just delegate to the format object which may
645        # be parameterised.
646        real_branch = self._format.get_branch_format().initialize(self,
647                                                                  name=name, repository=repository,
648                                                                  append_revisions_only=append_revisions_only)
649        if not isinstance(real_branch, RemoteBranch):
650            if not isinstance(repository, RemoteRepository):
651                raise AssertionError(
652                    'need a RemoteRepository to use with RemoteBranch, got %r'
653                    % (repository,))
654            result = RemoteBranch(self, repository, real_branch, name=name)
655        else:
656            result = real_branch
657        # BzrDir.clone_on_transport() uses the result of create_branch but does
658        # not return it to its callers; we save approximately 8% of our round
659        # trips by handing the branch we created back to the first caller to
660        # open_branch rather than probing anew. Long term we need a API in
661        # bzrdir that doesn't discard result objects (like result_branch).
662        # RBC 20090225
663        self._next_open_branch_result = result
664        return result
665
666    def destroy_branch(self, name=None):
667        """See BzrDir.destroy_branch"""
668        if name is None:
669            name = self._get_selected_branch()
670        if name != "":
671            raise errors.NoColocatedBranchSupport(self)
672        path = self._path_for_remote_call(self._client)
673        try:
674            if name != "":
675                args = (name, )
676            else:
677                args = ()
678            response = self._call(b'BzrDir.destroy_branch', path, *args)
679        except errors.UnknownSmartMethod:
680            self._ensure_real()
681            self._real_bzrdir.destroy_branch(name=name)
682            self._next_open_branch_result = None
683            return
684        self._next_open_branch_result = None
685        if response[0] != b'ok':
686            raise SmartProtocolError(
687                'unexpected response code %s' % (response,))
688
689    def create_workingtree(self, revision_id=None, from_branch=None,
690                           accelerator_tree=None, hardlink=False):
691        raise errors.NotLocalUrl(self.transport.base)
692
693    def find_branch_format(self, name=None):
694        """Find the branch 'format' for this bzrdir.
695
696        This might be a synthetic object for e.g. RemoteBranch and SVN.
697        """
698        b = self.open_branch(name=name)
699        return b._format
700
701    def branch_names(self):
702        path = self._path_for_remote_call(self._client)
703        try:
704            response, handler = self._call_expecting_body(
705                b'BzrDir.get_branches', path)
706        except errors.UnknownSmartMethod:
707            self._ensure_real()
708            return self._real_bzrdir.branch_names()
709        if response[0] != b"success":
710            raise errors.UnexpectedSmartServerResponse(response)
711        body = bencode.bdecode(handler.read_body_bytes())
712        ret = []
713        for name, value in body.items():
714            name = name.decode('utf-8')
715            ret.append(name)
716        return ret
717
718    def get_branches(self, possible_transports=None, ignore_fallbacks=False):
719        path = self._path_for_remote_call(self._client)
720        try:
721            response, handler = self._call_expecting_body(
722                b'BzrDir.get_branches', path)
723        except errors.UnknownSmartMethod:
724            self._ensure_real()
725            return self._real_bzrdir.get_branches()
726        if response[0] != b"success":
727            raise errors.UnexpectedSmartServerResponse(response)
728        body = bencode.bdecode(handler.read_body_bytes())
729        ret = {}
730        for name, value in body.items():
731            name = name.decode('utf-8')
732            ret[name] = self._open_branch(
733                name, value[0].decode('ascii'), value[1],
734                possible_transports=possible_transports,
735                ignore_fallbacks=ignore_fallbacks)
736        return ret
737
738    def set_branch_reference(self, target_branch, name=None):
739        """See BzrDir.set_branch_reference()."""
740        if name is None:
741            name = self._get_selected_branch()
742        if name != "":
743            raise errors.NoColocatedBranchSupport(self)
744        self._ensure_real()
745        return self._real_bzrdir.set_branch_reference(target_branch, name=name)
746
747    def get_branch_reference(self, name=None):
748        """See BzrDir.get_branch_reference()."""
749        if name is None:
750            name = self._get_selected_branch()
751        if name != "":
752            raise errors.NoColocatedBranchSupport(self)
753        response = self._get_branch_reference()
754        if response[0] == 'ref':
755            return response[1].decode('utf-8')
756        else:
757            return None
758
759    def _get_branch_reference(self):
760        """Get branch reference information
761
762        :return: Tuple with (kind, location_or_format)
763            if kind == 'ref', then location_or_format contains a location
764            otherwise, it contains a format name
765        """
766        path = self._path_for_remote_call(self._client)
767        medium = self._client._medium
768        candidate_calls = [
769            (b'BzrDir.open_branchV3', (2, 1)),
770            (b'BzrDir.open_branchV2', (1, 13)),
771            (b'BzrDir.open_branch', None),
772            ]
773        for verb, required_version in candidate_calls:
774            if required_version and medium._is_remote_before(required_version):
775                continue
776            try:
777                response = self._call(verb, path)
778            except errors.UnknownSmartMethod:
779                if required_version is None:
780                    raise
781                medium._remember_remote_is_before(required_version)
782            else:
783                break
784        if verb == b'BzrDir.open_branch':
785            if response[0] != b'ok':
786                raise errors.UnexpectedSmartServerResponse(response)
787            if response[1] != b'':
788                return ('ref', response[1])
789            else:
790                return ('branch', b'')
791        if response[0] not in (b'ref', b'branch'):
792            raise errors.UnexpectedSmartServerResponse(response)
793        return (response[0].decode('ascii'), response[1])
794
795    def _get_tree_branch(self, name=None):
796        """See BzrDir._get_tree_branch()."""
797        return None, self.open_branch(name=name)
798
799    def _open_branch(self, name, kind, location_or_format,
800                     ignore_fallbacks=False, possible_transports=None):
801        if kind == 'ref':
802            # a branch reference, use the existing BranchReference logic.
803            format = BranchReferenceFormat()
804            ref_loc = urlutils.join(self.user_url, location_or_format.decode('utf-8'))
805            return format.open(self, name=name, _found=True,
806                               location=ref_loc,
807                               ignore_fallbacks=ignore_fallbacks,
808                               possible_transports=possible_transports)
809        branch_format_name = location_or_format
810        if not branch_format_name:
811            branch_format_name = None
812        format = RemoteBranchFormat(network_name=branch_format_name)
813        return RemoteBranch(self, self.find_repository(), format=format,
814                            setup_stacking=not ignore_fallbacks, name=name,
815                            possible_transports=possible_transports)
816
817    def open_branch(self, name=None, unsupported=False,
818                    ignore_fallbacks=False, possible_transports=None):
819        if name is None:
820            name = self._get_selected_branch()
821        if name != "":
822            raise errors.NoColocatedBranchSupport(self)
823        if unsupported:
824            raise NotImplementedError(
825                'unsupported flag support not implemented yet.')
826        if self._next_open_branch_result is not None:
827            # See create_branch for details.
828            result = self._next_open_branch_result
829            self._next_open_branch_result = None
830            return result
831        response = self._get_branch_reference()
832        return self._open_branch(name, response[0], response[1],
833                                 possible_transports=possible_transports,
834                                 ignore_fallbacks=ignore_fallbacks)
835
836    def _open_repo_v1(self, path):
837        verb = b'BzrDir.find_repository'
838        response = self._call(verb, path)
839        if response[0] != b'ok':
840            raise errors.UnexpectedSmartServerResponse(response)
841        # servers that only support the v1 method don't support external
842        # references either.
843        self._ensure_real()
844        repo = self._real_bzrdir.open_repository()
845        response = response + (b'no', repo._format.network_name())
846        return response, repo
847
848    def _open_repo_v2(self, path):
849        verb = b'BzrDir.find_repositoryV2'
850        response = self._call(verb, path)
851        if response[0] != b'ok':
852            raise errors.UnexpectedSmartServerResponse(response)
853        self._ensure_real()
854        repo = self._real_bzrdir.open_repository()
855        response = response + (repo._format.network_name(),)
856        return response, repo
857
858    def _open_repo_v3(self, path):
859        verb = b'BzrDir.find_repositoryV3'
860        medium = self._client._medium
861        if medium._is_remote_before((1, 13)):
862            raise errors.UnknownSmartMethod(verb)
863        try:
864            response = self._call(verb, path)
865        except errors.UnknownSmartMethod:
866            medium._remember_remote_is_before((1, 13))
867            raise
868        if response[0] != b'ok':
869            raise errors.UnexpectedSmartServerResponse(response)
870        return response, None
871
872    def open_repository(self):
873        path = self._path_for_remote_call(self._client)
874        response = None
875        for probe in [self._open_repo_v3, self._open_repo_v2,
876                      self._open_repo_v1]:
877            try:
878                response, real_repo = probe(path)
879                break
880            except errors.UnknownSmartMethod:
881                pass
882        if response is None:
883            raise errors.UnknownSmartMethod(b'BzrDir.find_repository{3,2,}')
884        if response[0] != b'ok':
885            raise errors.UnexpectedSmartServerResponse(response)
886        if len(response) != 6:
887            raise SmartProtocolError(
888                'incorrect response length %s' % (response,))
889        if response[1] == b'':
890            # repo is at this dir.
891            format = response_tuple_to_repo_format(response[2:])
892            # Used to support creating a real format instance when needed.
893            format._creating_bzrdir = self
894            remote_repo = RemoteRepository(self, format)
895            format._creating_repo = remote_repo
896            if real_repo is not None:
897                remote_repo._set_real_repository(real_repo)
898            return remote_repo
899        else:
900            raise errors.NoRepositoryPresent(self)
901
902    def has_workingtree(self):
903        if self._has_working_tree is None:
904            path = self._path_for_remote_call(self._client)
905            try:
906                response = self._call(b'BzrDir.has_workingtree', path)
907            except errors.UnknownSmartMethod:
908                self._ensure_real()
909                self._has_working_tree = self._real_bzrdir.has_workingtree()
910            else:
911                if response[0] not in (b'yes', b'no'):
912                    raise SmartProtocolError(
913                        'unexpected response code %s' % (response,))
914                self._has_working_tree = (response[0] == b'yes')
915        return self._has_working_tree
916
917    def open_workingtree(self, recommend_upgrade=True):
918        if self.has_workingtree():
919            raise errors.NotLocalUrl(self.root_transport)
920        else:
921            raise errors.NoWorkingTree(self.root_transport.base)
922
923    def _path_for_remote_call(self, client):
924        """Return the path to be used for this bzrdir in a remote call."""
925        remote_path = client.remote_path_from_transport(self.root_transport)
926        remote_path = remote_path.decode('utf-8')
927        base_url, segment_parameters = urlutils.split_segment_parameters_raw(
928            remote_path)
929        base_url = base_url.encode('utf-8')
930        return base_url
931
932    def get_branch_transport(self, branch_format, name=None):
933        self._ensure_real()
934        return self._real_bzrdir.get_branch_transport(branch_format, name=name)
935
936    def get_repository_transport(self, repository_format):
937        self._ensure_real()
938        return self._real_bzrdir.get_repository_transport(repository_format)
939
940    def get_workingtree_transport(self, workingtree_format):
941        self._ensure_real()
942        return self._real_bzrdir.get_workingtree_transport(workingtree_format)
943
944    def can_convert_format(self):
945        """Upgrading of remote bzrdirs is not supported yet."""
946        return False
947
948    def needs_format_conversion(self, format):
949        """Upgrading of remote bzrdirs is not supported yet."""
950        return False
951
952    def _get_config(self):
953        return RemoteBzrDirConfig(self)
954
955    def _get_config_store(self):
956        return RemoteControlStore(self)
957
958
959class RemoteInventoryTree(InventoryRevisionTree):
960
961    def __init__(self, repository, inv, revision_id):
962        super(RemoteInventoryTree, self).__init__(repository, inv, revision_id)
963
964    def archive(self, format, name, root=None, subdir=None, force_mtime=None):
965        ret = self._repository._revision_archive(
966            self.get_revision_id(), format, name, root, subdir,
967            force_mtime=force_mtime)
968        if ret is None:
969            return super(RemoteInventoryTree, self).archive(
970                format, name, root, subdir, force_mtime=force_mtime)
971        return ret
972
973    def annotate_iter(self, path,
974                      default_revision=_mod_revision.CURRENT_REVISION):
975        """Return an iterator of revision_id, line tuples.
976
977        For working trees (and mutable trees in general), the special
978        revision_id 'current:' will be used for lines that are new in this
979        tree, e.g. uncommitted changes.
980        :param default_revision: For lines that don't match a basis, mark them
981            with this revision id. Not all implementations will make use of
982            this value.
983        """
984        ret = self._repository._annotate_file_revision(
985            self.get_revision_id(), path, file_id=None,
986            default_revision=default_revision)
987        if ret is None:
988            return super(RemoteInventoryTree, self).annotate_iter(
989                path, default_revision=default_revision)
990        return ret
991
992
993class RemoteRepositoryFormat(vf_repository.VersionedFileRepositoryFormat):
994    """Format for repositories accessed over a _SmartClient.
995
996    Instances of this repository are represented by RemoteRepository
997    instances.
998
999    The RemoteRepositoryFormat is parameterized during construction
1000    to reflect the capabilities of the real, remote format. Specifically
1001    the attributes rich_root_data and supports_tree_reference are set
1002    on a per instance basis, and are not set (and should not be) at
1003    the class level.
1004
1005    :ivar _custom_format: If set, a specific concrete repository format that
1006        will be used when initializing a repository with this
1007        RemoteRepositoryFormat.
1008    :ivar _creating_repo: If set, the repository object that this
1009        RemoteRepositoryFormat was created for: it can be called into
1010        to obtain data like the network name.
1011    """
1012
1013    _matchingcontroldir = RemoteBzrDirFormat()
1014    supports_full_versioned_files = True
1015    supports_leaving_lock = True
1016    supports_overriding_transport = False
1017
1018    def __init__(self):
1019        _mod_repository.RepositoryFormat.__init__(self)
1020        self._custom_format = None
1021        self._network_name = None
1022        self._creating_bzrdir = None
1023        self._revision_graph_can_have_wrong_parents = None
1024        self._supports_chks = None
1025        self._supports_external_lookups = None
1026        self._supports_tree_reference = None
1027        self._supports_funky_characters = None
1028        self._supports_nesting_repositories = None
1029        self._rich_root_data = None
1030
1031    def __repr__(self):
1032        return "%s(_network_name=%r)" % (self.__class__.__name__,
1033                                         self._network_name)
1034
1035    @property
1036    def fast_deltas(self):
1037        self._ensure_real()
1038        return self._custom_format.fast_deltas
1039
1040    @property
1041    def rich_root_data(self):
1042        if self._rich_root_data is None:
1043            self._ensure_real()
1044            self._rich_root_data = self._custom_format.rich_root_data
1045        return self._rich_root_data
1046
1047    @property
1048    def supports_chks(self):
1049        if self._supports_chks is None:
1050            self._ensure_real()
1051            self._supports_chks = self._custom_format.supports_chks
1052        return self._supports_chks
1053
1054    @property
1055    def supports_external_lookups(self):
1056        if self._supports_external_lookups is None:
1057            self._ensure_real()
1058            self._supports_external_lookups = \
1059                self._custom_format.supports_external_lookups
1060        return self._supports_external_lookups
1061
1062    @property
1063    def supports_funky_characters(self):
1064        if self._supports_funky_characters is None:
1065            self._ensure_real()
1066            self._supports_funky_characters = \
1067                self._custom_format.supports_funky_characters
1068        return self._supports_funky_characters
1069
1070    @property
1071    def supports_nesting_repositories(self):
1072        if self._supports_nesting_repositories is None:
1073            self._ensure_real()
1074            self._supports_nesting_repositories = \
1075                self._custom_format.supports_nesting_repositories
1076        return self._supports_nesting_repositories
1077
1078    @property
1079    def supports_tree_reference(self):
1080        if self._supports_tree_reference is None:
1081            self._ensure_real()
1082            self._supports_tree_reference = \
1083                self._custom_format.supports_tree_reference
1084        return self._supports_tree_reference
1085
1086    @property
1087    def revision_graph_can_have_wrong_parents(self):
1088        if self._revision_graph_can_have_wrong_parents is None:
1089            self._ensure_real()
1090            self._revision_graph_can_have_wrong_parents = \
1091                self._custom_format.revision_graph_can_have_wrong_parents
1092        return self._revision_graph_can_have_wrong_parents
1093
1094    def _vfs_initialize(self, a_controldir, shared):
1095        """Helper for common code in initialize."""
1096        if self._custom_format:
1097            # Custom format requested
1098            result = self._custom_format.initialize(
1099                a_controldir, shared=shared)
1100        elif self._creating_bzrdir is not None:
1101            # Use the format that the repository we were created to back
1102            # has.
1103            prior_repo = self._creating_bzrdir.open_repository()
1104            prior_repo._ensure_real()
1105            result = prior_repo._real_repository._format.initialize(
1106                a_controldir, shared=shared)
1107        else:
1108            # assume that a_bzr is a RemoteBzrDir but the smart server didn't
1109            # support remote initialization.
1110            # We delegate to a real object at this point (as RemoteBzrDir
1111            # delegate to the repository format which would lead to infinite
1112            # recursion if we just called a_controldir.create_repository.
1113            a_controldir._ensure_real()
1114            result = a_controldir._real_bzrdir.create_repository(shared=shared)
1115        if not isinstance(result, RemoteRepository):
1116            return self.open(a_controldir)
1117        else:
1118            return result
1119
1120    def initialize(self, a_controldir, shared=False):
1121        # Being asked to create on a non RemoteBzrDir:
1122        if not isinstance(a_controldir, RemoteBzrDir):
1123            return self._vfs_initialize(a_controldir, shared)
1124        medium = a_controldir._client._medium
1125        if medium._is_remote_before((1, 13)):
1126            return self._vfs_initialize(a_controldir, shared)
1127        # Creating on a remote bzr dir.
1128        # 1) get the network name to use.
1129        if self._custom_format:
1130            network_name = self._custom_format.network_name()
1131        elif self._network_name:
1132            network_name = self._network_name
1133        else:
1134            # Select the current breezy default and ask for that.
1135            reference_bzrdir_format = controldir.format_registry.get(
1136                'default')()
1137            reference_format = reference_bzrdir_format.repository_format
1138            network_name = reference_format.network_name()
1139        # 2) try direct creation via RPC
1140        path = a_controldir._path_for_remote_call(a_controldir._client)
1141        verb = b'BzrDir.create_repository'
1142        if shared:
1143            shared_str = b'True'
1144        else:
1145            shared_str = b'False'
1146        try:
1147            response = a_controldir._call(verb, path, network_name, shared_str)
1148        except errors.UnknownSmartMethod:
1149            # Fallback - use vfs methods
1150            medium._remember_remote_is_before((1, 13))
1151            return self._vfs_initialize(a_controldir, shared)
1152        else:
1153            # Turn the response into a RemoteRepository object.
1154            format = response_tuple_to_repo_format(response[1:])
1155            # Used to support creating a real format instance when needed.
1156            format._creating_bzrdir = a_controldir
1157            remote_repo = RemoteRepository(a_controldir, format)
1158            format._creating_repo = remote_repo
1159            return remote_repo
1160
1161    def open(self, a_controldir):
1162        if not isinstance(a_controldir, RemoteBzrDir):
1163            raise AssertionError('%r is not a RemoteBzrDir' % (a_controldir,))
1164        return a_controldir.open_repository()
1165
1166    def _ensure_real(self):
1167        if self._custom_format is None:
1168            try:
1169                self._custom_format = _mod_repository.network_format_registry.get(
1170                    self._network_name)
1171            except KeyError:
1172                raise errors.UnknownFormatError(kind='repository',
1173                                                format=self._network_name)
1174
1175    @property
1176    def _fetch_order(self):
1177        self._ensure_real()
1178        return self._custom_format._fetch_order
1179
1180    @property
1181    def _fetch_uses_deltas(self):
1182        self._ensure_real()
1183        return self._custom_format._fetch_uses_deltas
1184
1185    @property
1186    def _fetch_reconcile(self):
1187        self._ensure_real()
1188        return self._custom_format._fetch_reconcile
1189
1190    def get_format_description(self):
1191        self._ensure_real()
1192        return 'Remote: ' + self._custom_format.get_format_description()
1193
1194    def __eq__(self, other):
1195        return self.__class__ is other.__class__
1196
1197    def network_name(self):
1198        if self._network_name:
1199            return self._network_name
1200        self._creating_repo._ensure_real()
1201        return self._creating_repo._real_repository._format.network_name()
1202
1203    @property
1204    def pack_compresses(self):
1205        self._ensure_real()
1206        return self._custom_format.pack_compresses
1207
1208    @property
1209    def _serializer(self):
1210        self._ensure_real()
1211        return self._custom_format._serializer
1212
1213
1214class RemoteRepository(_mod_repository.Repository, _RpcHelper,
1215                       lock._RelockDebugMixin):
1216    """Repository accessed over rpc.
1217
1218    For the moment most operations are performed using local transport-backed
1219    Repository objects.
1220    """
1221
1222    def __init__(self, remote_bzrdir, format, real_repository=None, _client=None):
1223        """Create a RemoteRepository instance.
1224
1225        :param remote_bzrdir: The bzrdir hosting this repository.
1226        :param format: The RemoteFormat object to use.
1227        :param real_repository: If not None, a local implementation of the
1228            repository logic for the repository, usually accessing the data
1229            via the VFS.
1230        :param _client: Private testing parameter - override the smart client
1231            to be used by the repository.
1232        """
1233        if real_repository:
1234            self._real_repository = real_repository
1235        else:
1236            self._real_repository = None
1237        self.controldir = remote_bzrdir
1238        if _client is None:
1239            self._client = remote_bzrdir._client
1240        else:
1241            self._client = _client
1242        self._format = format
1243        self._lock_mode = None
1244        self._lock_token = None
1245        self._write_group_tokens = None
1246        self._lock_count = 0
1247        self._leave_lock = False
1248        # Cache of revision parents; misses are cached during read locks, and
1249        # write locks when no _real_repository has been set.
1250        self._unstacked_provider = graph.CachingParentsProvider(
1251            get_parent_map=self._get_parent_map_rpc)
1252        self._unstacked_provider.disable_cache()
1253        # For tests:
1254        # These depend on the actual remote format, so force them off for
1255        # maximum compatibility. XXX: In future these should depend on the
1256        # remote repository instance, but this is irrelevant until we perform
1257        # reconcile via an RPC call.
1258        self._reconcile_does_inventory_gc = False
1259        self._reconcile_fixes_text_parents = False
1260        self._reconcile_backsup_inventory = False
1261        self.base = self.controldir.transport.base
1262        # Additional places to query for data.
1263        self._fallback_repositories = []
1264
1265    @property
1266    def user_transport(self):
1267        return self.controldir.user_transport
1268
1269    @property
1270    def control_transport(self):
1271        # XXX: Normally you shouldn't directly get at the remote repository
1272        # transport, but I'm not sure it's worth making this method
1273        # optional -- mbp 2010-04-21
1274        return self.controldir.get_repository_transport(None)
1275
1276    def __str__(self):
1277        return "%s(%s)" % (self.__class__.__name__, self.base)
1278
1279    __repr__ = __str__
1280
1281    def abort_write_group(self, suppress_errors=False):
1282        """Complete a write group on the decorated repository.
1283
1284        Smart methods perform operations in a single step so this API
1285        is not really applicable except as a compatibility thunk
1286        for older plugins that don't use e.g. the CommitBuilder
1287        facility.
1288
1289        :param suppress_errors: see Repository.abort_write_group.
1290        """
1291        if self._real_repository:
1292            self._ensure_real()
1293            return self._real_repository.abort_write_group(
1294                suppress_errors=suppress_errors)
1295        if not self.is_in_write_group():
1296            if suppress_errors:
1297                mutter('(suppressed) not in write group')
1298                return
1299            raise errors.BzrError("not in write group")
1300        path = self.controldir._path_for_remote_call(self._client)
1301        try:
1302            response = self._call(b'Repository.abort_write_group', path,
1303                                  self._lock_token,
1304                                  [token.encode('utf-8') for token in self._write_group_tokens])
1305        except Exception as exc:
1306            self._write_group = None
1307            if not suppress_errors:
1308                raise
1309            mutter('abort_write_group failed')
1310            log_exception_quietly()
1311            note(gettext('bzr: ERROR (ignored): %s'), exc)
1312        else:
1313            if response != (b'ok', ):
1314                raise errors.UnexpectedSmartServerResponse(response)
1315            self._write_group_tokens = None
1316
1317    @property
1318    def chk_bytes(self):
1319        """Decorate the real repository for now.
1320
1321        In the long term a full blown network facility is needed to avoid
1322        creating a real repository object locally.
1323        """
1324        self._ensure_real()
1325        return self._real_repository.chk_bytes
1326
1327    def commit_write_group(self):
1328        """Complete a write group on the decorated repository.
1329
1330        Smart methods perform operations in a single step so this API
1331        is not really applicable except as a compatibility thunk
1332        for older plugins that don't use e.g. the CommitBuilder
1333        facility.
1334        """
1335        if self._real_repository:
1336            self._ensure_real()
1337            return self._real_repository.commit_write_group()
1338        if not self.is_in_write_group():
1339            raise errors.BzrError("not in write group")
1340        path = self.controldir._path_for_remote_call(self._client)
1341        response = self._call(b'Repository.commit_write_group', path,
1342                              self._lock_token, [token.encode('utf-8') for token in self._write_group_tokens])
1343        if response != (b'ok', ):
1344            raise errors.UnexpectedSmartServerResponse(response)
1345        self._write_group_tokens = None
1346        # Refresh data after writing to the repository.
1347        self.refresh_data()
1348
1349    def resume_write_group(self, tokens):
1350        if self._real_repository:
1351            return self._real_repository.resume_write_group(tokens)
1352        path = self.controldir._path_for_remote_call(self._client)
1353        try:
1354            response = self._call(b'Repository.check_write_group', path,
1355                                  self._lock_token, [token.encode('utf-8') for token in tokens])
1356        except errors.UnknownSmartMethod:
1357            self._ensure_real()
1358            return self._real_repository.resume_write_group(tokens)
1359        if response != (b'ok', ):
1360            raise errors.UnexpectedSmartServerResponse(response)
1361        self._write_group_tokens = tokens
1362
1363    def suspend_write_group(self):
1364        if self._real_repository:
1365            return self._real_repository.suspend_write_group()
1366        ret = self._write_group_tokens or []
1367        self._write_group_tokens = None
1368        return ret
1369
1370    def get_missing_parent_inventories(self, check_for_missing_texts=True):
1371        self._ensure_real()
1372        return self._real_repository.get_missing_parent_inventories(
1373            check_for_missing_texts=check_for_missing_texts)
1374
1375    def _get_rev_id_for_revno_vfs(self, revno, known_pair):
1376        self._ensure_real()
1377        return self._real_repository.get_rev_id_for_revno(
1378            revno, known_pair)
1379
1380    def get_rev_id_for_revno(self, revno, known_pair):
1381        """See Repository.get_rev_id_for_revno."""
1382        path = self.controldir._path_for_remote_call(self._client)
1383        try:
1384            if self._client._medium._is_remote_before((1, 17)):
1385                return self._get_rev_id_for_revno_vfs(revno, known_pair)
1386            response = self._call(
1387                b'Repository.get_rev_id_for_revno', path, revno, known_pair)
1388        except errors.UnknownSmartMethod:
1389            self._client._medium._remember_remote_is_before((1, 17))
1390            return self._get_rev_id_for_revno_vfs(revno, known_pair)
1391        except errors.UnknownErrorFromSmartServer as e:
1392            # Older versions of Bazaar/Breezy (<< 3.0.0) would raise a
1393            # ValueError instead of returning revno-outofbounds
1394            if len(e.error_tuple) < 3:
1395                raise
1396            if e.error_tuple[:2] != (b'error', b'ValueError'):
1397                raise
1398            m = re.match(
1399                br"requested revno \(([0-9]+)\) is later than given "
1400                br"known revno \(([0-9]+)\)", e.error_tuple[2])
1401            if not m:
1402                raise
1403            raise errors.RevnoOutOfBounds(
1404                int(m.group(1)), (0, int(m.group(2))))
1405        if response[0] == b'ok':
1406            return True, response[1]
1407        elif response[0] == b'history-incomplete':
1408            known_pair = response[1:3]
1409            for fallback in self._fallback_repositories:
1410                found, result = fallback.get_rev_id_for_revno(
1411                    revno, known_pair)
1412                if found:
1413                    return True, result
1414                else:
1415                    known_pair = result
1416            # Not found in any fallbacks
1417            return False, known_pair
1418        else:
1419            raise errors.UnexpectedSmartServerResponse(response)
1420
1421    def _ensure_real(self):
1422        """Ensure that there is a _real_repository set.
1423
1424        Used before calls to self._real_repository.
1425
1426        Note that _ensure_real causes many roundtrips to the server which are
1427        not desirable, and prevents the use of smart one-roundtrip RPC's to
1428        perform complex operations (such as accessing parent data, streaming
1429        revisions etc). Adding calls to _ensure_real should only be done when
1430        bringing up new functionality, adding fallbacks for smart methods that
1431        require a fallback path, and never to replace an existing smart method
1432        invocation. If in doubt chat to the bzr network team.
1433        """
1434        if self._real_repository is None:
1435            if 'hpssvfs' in debug.debug_flags:
1436                import traceback
1437                warning('VFS Repository access triggered\n%s',
1438                        ''.join(traceback.format_stack()))
1439            self._unstacked_provider.missing_keys.clear()
1440            self.controldir._ensure_real()
1441            self._set_real_repository(
1442                self.controldir._real_bzrdir.open_repository())
1443
1444    def _translate_error(self, err, **context):
1445        self.controldir._translate_error(err, repository=self, **context)
1446
1447    def find_text_key_references(self):
1448        """Find the text key references within the repository.
1449
1450        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1451            to whether they were referred to by the inventory of the
1452            revision_id that they contain. The inventory texts from all present
1453            revision ids are assessed to generate this report.
1454        """
1455        self._ensure_real()
1456        return self._real_repository.find_text_key_references()
1457
1458    def _generate_text_key_index(self):
1459        """Generate a new text key index for the repository.
1460
1461        This is an expensive function that will take considerable time to run.
1462
1463        :return: A dict mapping (file_id, revision_id) tuples to a list of
1464            parents, also (file_id, revision_id) tuples.
1465        """
1466        self._ensure_real()
1467        return self._real_repository._generate_text_key_index()
1468
1469    def _get_revision_graph(self, revision_id):
1470        """Private method for using with old (< 1.2) servers to fallback."""
1471        if revision_id is None:
1472            revision_id = b''
1473        elif _mod_revision.is_null(revision_id):
1474            return {}
1475
1476        path = self.controldir._path_for_remote_call(self._client)
1477        response = self._call_expecting_body(
1478            b'Repository.get_revision_graph', path, revision_id)
1479        response_tuple, response_handler = response
1480        if response_tuple[0] != b'ok':
1481            raise errors.UnexpectedSmartServerResponse(response_tuple)
1482        coded = response_handler.read_body_bytes()
1483        if coded == b'':
1484            # no revisions in this repository!
1485            return {}
1486        lines = coded.split(b'\n')
1487        revision_graph = {}
1488        for line in lines:
1489            d = tuple(line.split())
1490            revision_graph[d[0]] = d[1:]
1491
1492        return revision_graph
1493
1494    def _get_sink(self):
1495        """See Repository._get_sink()."""
1496        return RemoteStreamSink(self)
1497
1498    def _get_source(self, to_format):
1499        """Return a source for streaming from this repository."""
1500        return RemoteStreamSource(self, to_format)
1501
1502    def get_file_graph(self):
1503        with self.lock_read():
1504            return graph.Graph(self.texts)
1505
1506    def has_revision(self, revision_id):
1507        """True if this repository has a copy of the revision."""
1508        # Copy of breezy.repository.Repository.has_revision
1509        with self.lock_read():
1510            return revision_id in self.has_revisions((revision_id,))
1511
1512    def has_revisions(self, revision_ids):
1513        """Probe to find out the presence of multiple revisions.
1514
1515        :param revision_ids: An iterable of revision_ids.
1516        :return: A set of the revision_ids that were present.
1517        """
1518        with self.lock_read():
1519            # Copy of breezy.repository.Repository.has_revisions
1520            parent_map = self.get_parent_map(revision_ids)
1521            result = set(parent_map)
1522            if _mod_revision.NULL_REVISION in revision_ids:
1523                result.add(_mod_revision.NULL_REVISION)
1524            return result
1525
1526    def _has_same_fallbacks(self, other_repo):
1527        """Returns true if the repositories have the same fallbacks."""
1528        # XXX: copied from Repository; it should be unified into a base class
1529        # <https://bugs.launchpad.net/bzr/+bug/401622>
1530        my_fb = self._fallback_repositories
1531        other_fb = other_repo._fallback_repositories
1532        if len(my_fb) != len(other_fb):
1533            return False
1534        for f, g in zip(my_fb, other_fb):
1535            if not f.has_same_location(g):
1536                return False
1537        return True
1538
1539    def has_same_location(self, other):
1540        # TODO: Move to RepositoryBase and unify with the regular Repository
1541        # one; unfortunately the tests rely on slightly different behaviour at
1542        # present -- mbp 20090710
1543        return (self.__class__ is other.__class__
1544                and self.controldir.transport.base == other.controldir.transport.base)
1545
1546    def get_graph(self, other_repository=None):
1547        """Return the graph for this repository format"""
1548        parents_provider = self._make_parents_provider(other_repository)
1549        return graph.Graph(parents_provider)
1550
1551    def get_known_graph_ancestry(self, revision_ids):
1552        """Return the known graph for a set of revision ids and their ancestors.
1553        """
1554        with self.lock_read():
1555            revision_graph = dict(((key, value) for key, value in
1556                                   self.get_graph().iter_ancestry(revision_ids) if value is not None))
1557            revision_graph = _mod_repository._strip_NULL_ghosts(revision_graph)
1558            return graph.KnownGraph(revision_graph)
1559
1560    def gather_stats(self, revid=None, committers=None):
1561        """See Repository.gather_stats()."""
1562        path = self.controldir._path_for_remote_call(self._client)
1563        # revid can be None to indicate no revisions, not just NULL_REVISION
1564        if revid is None or _mod_revision.is_null(revid):
1565            fmt_revid = b''
1566        else:
1567            fmt_revid = revid
1568        if committers is None or not committers:
1569            fmt_committers = b'no'
1570        else:
1571            fmt_committers = b'yes'
1572        response_tuple, response_handler = self._call_expecting_body(
1573            b'Repository.gather_stats', path, fmt_revid, fmt_committers)
1574        if response_tuple[0] != b'ok':
1575            raise errors.UnexpectedSmartServerResponse(response_tuple)
1576
1577        body = response_handler.read_body_bytes()
1578        result = {}
1579        for line in body.split(b'\n'):
1580            if not line:
1581                continue
1582            key, val_text = line.split(b':')
1583            key = key.decode('ascii')
1584            if key in ('revisions', 'size', 'committers'):
1585                result[key] = int(val_text)
1586            elif key in ('firstrev', 'latestrev'):
1587                values = val_text.split(b' ')[1:]
1588                result[key] = (float(values[0]), int(values[1]))
1589
1590        return result
1591
1592    def find_branches(self, using=False):
1593        """See Repository.find_branches()."""
1594        # should be an API call to the server.
1595        self._ensure_real()
1596        return self._real_repository.find_branches(using=using)
1597
1598    def get_physical_lock_status(self):
1599        """See Repository.get_physical_lock_status()."""
1600        path = self.controldir._path_for_remote_call(self._client)
1601        try:
1602            response = self._call(b'Repository.get_physical_lock_status', path)
1603        except errors.UnknownSmartMethod:
1604            self._ensure_real()
1605            return self._real_repository.get_physical_lock_status()
1606        if response[0] not in (b'yes', b'no'):
1607            raise errors.UnexpectedSmartServerResponse(response)
1608        return (response[0] == b'yes')
1609
1610    def is_in_write_group(self):
1611        """Return True if there is an open write group.
1612
1613        write groups are only applicable locally for the smart server..
1614        """
1615        if self._write_group_tokens is not None:
1616            return True
1617        if self._real_repository:
1618            return self._real_repository.is_in_write_group()
1619
1620    def is_locked(self):
1621        return self._lock_count >= 1
1622
1623    def is_shared(self):
1624        """See Repository.is_shared()."""
1625        path = self.controldir._path_for_remote_call(self._client)
1626        response = self._call(b'Repository.is_shared', path)
1627        if response[0] not in (b'yes', b'no'):
1628            raise SmartProtocolError(
1629                'unexpected response code %s' % (response,))
1630        return response[0] == b'yes'
1631
1632    def is_write_locked(self):
1633        return self._lock_mode == 'w'
1634
1635    def _warn_if_deprecated(self, branch=None):
1636        # If we have a real repository, the check will be done there, if we
1637        # don't the check will be done remotely.
1638        pass
1639
1640    def lock_read(self):
1641        """Lock the repository for read operations.
1642
1643        :return: A breezy.lock.LogicalLockResult.
1644        """
1645        # wrong eventually - want a local lock cache context
1646        if not self._lock_mode:
1647            self._note_lock('r')
1648            self._lock_mode = 'r'
1649            self._lock_count = 1
1650            self._unstacked_provider.enable_cache(cache_misses=True)
1651            if self._real_repository is not None:
1652                self._real_repository.lock_read()
1653            for repo in self._fallback_repositories:
1654                repo.lock_read()
1655        else:
1656            self._lock_count += 1
1657        return lock.LogicalLockResult(self.unlock)
1658
1659    def _remote_lock_write(self, token):
1660        path = self.controldir._path_for_remote_call(self._client)
1661        if token is None:
1662            token = b''
1663        err_context = {'token': token}
1664        response = self._call(b'Repository.lock_write', path, token,
1665                              **err_context)
1666        if response[0] == b'ok':
1667            ok, token = response
1668            return token
1669        else:
1670            raise errors.UnexpectedSmartServerResponse(response)
1671
1672    def lock_write(self, token=None, _skip_rpc=False):
1673        if not self._lock_mode:
1674            self._note_lock('w')
1675            if _skip_rpc:
1676                if self._lock_token is not None:
1677                    if token != self._lock_token:
1678                        raise errors.TokenMismatch(token, self._lock_token)
1679                self._lock_token = token
1680            else:
1681                self._lock_token = self._remote_lock_write(token)
1682            # if self._lock_token is None, then this is something like packs or
1683            # svn where we don't get to lock the repo, or a weave style repository
1684            # where we cannot lock it over the wire and attempts to do so will
1685            # fail.
1686            if self._real_repository is not None:
1687                self._real_repository.lock_write(token=self._lock_token)
1688            if token is not None:
1689                self._leave_lock = True
1690            else:
1691                self._leave_lock = False
1692            self._lock_mode = 'w'
1693            self._lock_count = 1
1694            cache_misses = self._real_repository is None
1695            self._unstacked_provider.enable_cache(cache_misses=cache_misses)
1696            for repo in self._fallback_repositories:
1697                # Writes don't affect fallback repos
1698                repo.lock_read()
1699        elif self._lock_mode == 'r':
1700            raise errors.ReadOnlyError(self)
1701        else:
1702            self._lock_count += 1
1703        return RepositoryWriteLockResult(self.unlock, self._lock_token or None)
1704
1705    def leave_lock_in_place(self):
1706        if not self._lock_token:
1707            raise NotImplementedError(self.leave_lock_in_place)
1708        self._leave_lock = True
1709
1710    def dont_leave_lock_in_place(self):
1711        if not self._lock_token:
1712            raise NotImplementedError(self.dont_leave_lock_in_place)
1713        self._leave_lock = False
1714
1715    def _set_real_repository(self, repository):
1716        """Set the _real_repository for this repository.
1717
1718        :param repository: The repository to fallback to for non-hpss
1719            implemented operations.
1720        """
1721        if self._real_repository is not None:
1722            # Replacing an already set real repository.
1723            # We cannot do this [currently] if the repository is locked -
1724            # synchronised state might be lost.
1725            if self.is_locked():
1726                raise AssertionError('_real_repository is already set')
1727        if isinstance(repository, RemoteRepository):
1728            raise AssertionError()
1729        self._real_repository = repository
1730        # three code paths happen here:
1731        # 1) old servers, RemoteBranch.open() calls _ensure_real before setting
1732        # up stacking. In this case self._fallback_repositories is [], and the
1733        # real repo is already setup. Preserve the real repo and
1734        # RemoteRepository.add_fallback_repository will avoid adding
1735        # duplicates.
1736        # 2) new servers, RemoteBranch.open() sets up stacking, and when
1737        # ensure_real is triggered from a branch, the real repository to
1738        # set already has a matching list with separate instances, but
1739        # as they are also RemoteRepositories we don't worry about making the
1740        # lists be identical.
1741        # 3) new servers, RemoteRepository.ensure_real is triggered before
1742        # RemoteBranch.ensure real, in this case we get a repo with no fallbacks
1743        # and need to populate it.
1744        if (self._fallback_repositories
1745            and len(self._real_repository._fallback_repositories)
1746                != len(self._fallback_repositories)):
1747            if len(self._real_repository._fallback_repositories):
1748                raise AssertionError(
1749                    "cannot cleanly remove existing _fallback_repositories")
1750        for fb in self._fallback_repositories:
1751            self._real_repository.add_fallback_repository(fb)
1752        if self._lock_mode == 'w':
1753            # if we are already locked, the real repository must be able to
1754            # acquire the lock with our token.
1755            self._real_repository.lock_write(self._lock_token)
1756        elif self._lock_mode == 'r':
1757            self._real_repository.lock_read()
1758        if self._write_group_tokens is not None:
1759            # if we are already in a write group, resume it
1760            self._real_repository.resume_write_group(self._write_group_tokens)
1761            self._write_group_tokens = None
1762
1763    def start_write_group(self):
1764        """Start a write group on the decorated repository.
1765
1766        Smart methods perform operations in a single step so this API
1767        is not really applicable except as a compatibility thunk
1768        for older plugins that don't use e.g. the CommitBuilder
1769        facility.
1770        """
1771        if self._real_repository:
1772            self._ensure_real()
1773            return self._real_repository.start_write_group()
1774        if not self.is_write_locked():
1775            raise errors.NotWriteLocked(self)
1776        if self._write_group_tokens is not None:
1777            raise errors.BzrError('already in a write group')
1778        path = self.controldir._path_for_remote_call(self._client)
1779        try:
1780            response = self._call(b'Repository.start_write_group', path,
1781                                  self._lock_token)
1782        except (errors.UnknownSmartMethod, errors.UnsuspendableWriteGroup):
1783            self._ensure_real()
1784            return self._real_repository.start_write_group()
1785        if response[0] != b'ok':
1786            raise errors.UnexpectedSmartServerResponse(response)
1787        self._write_group_tokens = [
1788            token.decode('utf-8') for token in response[1]]
1789
1790    def _unlock(self, token):
1791        path = self.controldir._path_for_remote_call(self._client)
1792        if not token:
1793            # with no token the remote repository is not persistently locked.
1794            return
1795        err_context = {'token': token}
1796        response = self._call(b'Repository.unlock', path, token,
1797                              **err_context)
1798        if response == (b'ok',):
1799            return
1800        else:
1801            raise errors.UnexpectedSmartServerResponse(response)
1802
1803    @only_raises(errors.LockNotHeld, errors.LockBroken)
1804    def unlock(self):
1805        if not self._lock_count:
1806            return lock.cant_unlock_not_held(self)
1807        self._lock_count -= 1
1808        if self._lock_count > 0:
1809            return
1810        self._unstacked_provider.disable_cache()
1811        old_mode = self._lock_mode
1812        self._lock_mode = None
1813        try:
1814            # The real repository is responsible at present for raising an
1815            # exception if it's in an unfinished write group.  However, it
1816            # normally will *not* actually remove the lock from disk - that's
1817            # done by the server on receiving the Repository.unlock call.
1818            # This is just to let the _real_repository stay up to date.
1819            if self._real_repository is not None:
1820                self._real_repository.unlock()
1821            elif self._write_group_tokens is not None:
1822                self.abort_write_group()
1823        finally:
1824            # The rpc-level lock should be released even if there was a
1825            # problem releasing the vfs-based lock.
1826            if old_mode == 'w':
1827                # Only write-locked repositories need to make a remote method
1828                # call to perform the unlock.
1829                old_token = self._lock_token
1830                self._lock_token = None
1831                if not self._leave_lock:
1832                    self._unlock(old_token)
1833        # Fallbacks are always 'lock_read()' so we don't pay attention to
1834        # self._leave_lock
1835        for repo in self._fallback_repositories:
1836            repo.unlock()
1837
1838    def break_lock(self):
1839        # should hand off to the network
1840        path = self.controldir._path_for_remote_call(self._client)
1841        try:
1842            response = self._call(b"Repository.break_lock", path)
1843        except errors.UnknownSmartMethod:
1844            self._ensure_real()
1845            return self._real_repository.break_lock()
1846        if response != (b'ok',):
1847            raise errors.UnexpectedSmartServerResponse(response)
1848
1849    def _get_tarball(self, compression):
1850        """Return a TemporaryFile containing a repository tarball.
1851
1852        Returns None if the server does not support sending tarballs.
1853        """
1854        import tempfile
1855        path = self.controldir._path_for_remote_call(self._client)
1856        try:
1857            response, protocol = self._call_expecting_body(
1858                b'Repository.tarball', path, compression.encode('ascii'))
1859        except errors.UnknownSmartMethod:
1860            protocol.cancel_read_body()
1861            return None
1862        if response[0] == b'ok':
1863            # Extract the tarball and return it
1864            t = tempfile.NamedTemporaryFile()
1865            # TODO: rpc layer should read directly into it...
1866            t.write(protocol.read_body_bytes())
1867            t.seek(0)
1868            return t
1869        raise errors.UnexpectedSmartServerResponse(response)
1870
1871    def sprout(self, to_bzrdir, revision_id=None):
1872        """Create a descendent repository for new development.
1873
1874        Unlike clone, this does not copy the settings of the repository.
1875        """
1876        with self.lock_read():
1877            dest_repo = self._create_sprouting_repo(to_bzrdir, shared=False)
1878            dest_repo.fetch(self, revision_id=revision_id)
1879            return dest_repo
1880
1881    def _create_sprouting_repo(self, a_controldir, shared):
1882        if not isinstance(a_controldir._format, self.controldir._format.__class__):
1883            # use target default format.
1884            dest_repo = a_controldir.create_repository()
1885        else:
1886            # Most control formats need the repository to be specifically
1887            # created, but on some old all-in-one formats it's not needed
1888            try:
1889                dest_repo = self._format.initialize(
1890                    a_controldir, shared=shared)
1891            except errors.UninitializableFormat:
1892                dest_repo = a_controldir.open_repository()
1893        return dest_repo
1894
1895    # These methods are just thin shims to the VFS object for now.
1896
1897    def revision_tree(self, revision_id):
1898        with self.lock_read():
1899            revision_id = _mod_revision.ensure_null(revision_id)
1900            if revision_id == _mod_revision.NULL_REVISION:
1901                return InventoryRevisionTree(self,
1902                                             Inventory(root_id=None), _mod_revision.NULL_REVISION)
1903            else:
1904                return list(self.revision_trees([revision_id]))[0]
1905
1906    def get_serializer_format(self):
1907        path = self.controldir._path_for_remote_call(self._client)
1908        try:
1909            response = self._call(b'VersionedFileRepository.get_serializer_format',
1910                                  path)
1911        except errors.UnknownSmartMethod:
1912            self._ensure_real()
1913            return self._real_repository.get_serializer_format()
1914        if response[0] != b'ok':
1915            raise errors.UnexpectedSmartServerResponse(response)
1916        return response[1]
1917
1918    def get_commit_builder(self, branch, parents, config, timestamp=None,
1919                           timezone=None, committer=None, revprops=None,
1920                           revision_id=None, lossy=False):
1921        """Obtain a CommitBuilder for this repository.
1922
1923        :param branch: Branch to commit to.
1924        :param parents: Revision ids of the parents of the new revision.
1925        :param config: Configuration to use.
1926        :param timestamp: Optional timestamp recorded for commit.
1927        :param timezone: Optional timezone for timestamp.
1928        :param committer: Optional committer to set for commit.
1929        :param revprops: Optional dictionary of revision properties.
1930        :param revision_id: Optional revision id.
1931        :param lossy: Whether to discard data that can not be natively
1932            represented, when pushing to a foreign VCS
1933        """
1934        if self._fallback_repositories and not self._format.supports_chks:
1935            raise errors.BzrError("Cannot commit directly to a stacked branch"
1936                                  " in pre-2a formats. See "
1937                                  "https://bugs.launchpad.net/bzr/+bug/375013 for details.")
1938        commit_builder_kls = vf_repository.VersionedFileCommitBuilder
1939        result = commit_builder_kls(self, parents, config,
1940                                    timestamp, timezone, committer, revprops, revision_id,
1941                                    lossy)
1942        self.start_write_group()
1943        return result
1944
1945    def add_fallback_repository(self, repository):
1946        """Add a repository to use for looking up data not held locally.
1947
1948        :param repository: A repository.
1949        """
1950        if not self._format.supports_external_lookups:
1951            raise errors.UnstackableRepositoryFormat(
1952                self._format.network_name(), self.base)
1953        # We need to accumulate additional repositories here, to pass them in
1954        # on various RPC's.
1955        #
1956        # Make the check before we lock: this raises an exception.
1957        self._check_fallback_repository(repository)
1958        if self.is_locked():
1959            # We will call fallback.unlock() when we transition to the unlocked
1960            # state, so always add a lock here. If a caller passes us a locked
1961            # repository, they are responsible for unlocking it later.
1962            repository.lock_read()
1963        self._fallback_repositories.append(repository)
1964        # If self._real_repository was parameterised already (e.g. because a
1965        # _real_branch had its get_stacked_on_url method called), then the
1966        # repository to be added may already be in the _real_repositories list.
1967        if self._real_repository is not None:
1968            fallback_locations = [repo.user_url for repo in
1969                                  self._real_repository._fallback_repositories]
1970            if repository.user_url not in fallback_locations:
1971                self._real_repository.add_fallback_repository(repository)
1972
1973    def _check_fallback_repository(self, repository):
1974        """Check that this repository can fallback to repository safely.
1975
1976        Raise an error if not.
1977
1978        :param repository: A repository to fallback to.
1979        """
1980        return _mod_repository.InterRepository._assert_same_model(
1981            self, repository)
1982
1983    def add_inventory(self, revid, inv, parents):
1984        self._ensure_real()
1985        return self._real_repository.add_inventory(revid, inv, parents)
1986
1987    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
1988                               parents, basis_inv=None, propagate_caches=False):
1989        self._ensure_real()
1990        return self._real_repository.add_inventory_by_delta(basis_revision_id,
1991                                                            delta, new_revision_id, parents, basis_inv=basis_inv,
1992                                                            propagate_caches=propagate_caches)
1993
1994    def add_revision(self, revision_id, rev, inv=None):
1995        _mod_revision.check_not_reserved_id(revision_id)
1996        key = (revision_id,)
1997        # check inventory present
1998        if not self.inventories.get_parent_map([key]):
1999            if inv is None:
2000                raise errors.WeaveRevisionNotPresent(revision_id,
2001                                                     self.inventories)
2002            else:
2003                # yes, this is not suitable for adding with ghosts.
2004                rev.inventory_sha1 = self.add_inventory(revision_id, inv,
2005                                                        rev.parent_ids)
2006        else:
2007            rev.inventory_sha1 = self.inventories.get_sha1s([key])[key]
2008        self._add_revision(rev)
2009
2010    def _add_revision(self, rev):
2011        if self._real_repository is not None:
2012            return self._real_repository._add_revision(rev)
2013        lines = self._serializer.write_revision_to_lines(rev)
2014        key = (rev.revision_id,)
2015        parents = tuple((parent,) for parent in rev.parent_ids)
2016        self._write_group_tokens, missing_keys = self._get_sink().insert_stream(
2017            [('revisions', [ChunkedContentFactory(key, parents, None, lines, chunks_are_lines=True)])],
2018            self._format, self._write_group_tokens)
2019
2020    def get_inventory(self, revision_id):
2021        with self.lock_read():
2022            return list(self.iter_inventories([revision_id]))[0]
2023
2024    def _iter_inventories_rpc(self, revision_ids, ordering):
2025        if ordering is None:
2026            ordering = 'unordered'
2027        path = self.controldir._path_for_remote_call(self._client)
2028        body = b"\n".join(revision_ids)
2029        response_tuple, response_handler = (
2030            self._call_with_body_bytes_expecting_body(
2031                b"VersionedFileRepository.get_inventories",
2032                (path, ordering.encode('ascii')), body))
2033        if response_tuple[0] != b"ok":
2034            raise errors.UnexpectedSmartServerResponse(response_tuple)
2035        deserializer = inventory_delta.InventoryDeltaDeserializer()
2036        byte_stream = response_handler.read_streamed_body()
2037        decoded = smart_repo._byte_stream_to_stream(byte_stream)
2038        if decoded is None:
2039            # no results whatsoever
2040            return
2041        src_format, stream = decoded
2042        if src_format.network_name() != self._format.network_name():
2043            raise AssertionError(
2044                "Mismatched RemoteRepository and stream src %r, %r" % (
2045                    src_format.network_name(), self._format.network_name()))
2046        # ignore the src format, it's not really relevant
2047        prev_inv = Inventory(root_id=None,
2048                             revision_id=_mod_revision.NULL_REVISION)
2049        # there should be just one substream, with inventory deltas
2050        try:
2051            substream_kind, substream = next(stream)
2052        except StopIteration:
2053            return
2054        if substream_kind != "inventory-deltas":
2055            raise AssertionError(
2056                "Unexpected stream %r received" % substream_kind)
2057        for record in substream:
2058            (parent_id, new_id, versioned_root, tree_references, invdelta) = (
2059                deserializer.parse_text_bytes(record.get_bytes_as("lines")))
2060            if parent_id != prev_inv.revision_id:
2061                raise AssertionError("invalid base %r != %r" % (parent_id,
2062                                                                prev_inv.revision_id))
2063            inv = prev_inv.create_by_apply_delta(invdelta, new_id)
2064            yield inv, inv.revision_id
2065            prev_inv = inv
2066
2067    def _iter_inventories_vfs(self, revision_ids, ordering=None):
2068        self._ensure_real()
2069        return self._real_repository._iter_inventories(revision_ids, ordering)
2070
2071    def iter_inventories(self, revision_ids, ordering=None):
2072        """Get many inventories by revision_ids.
2073
2074        This will buffer some or all of the texts used in constructing the
2075        inventories in memory, but will only parse a single inventory at a
2076        time.
2077
2078        :param revision_ids: The expected revision ids of the inventories.
2079        :param ordering: optional ordering, e.g. 'topological'.  If not
2080            specified, the order of revision_ids will be preserved (by
2081            buffering if necessary).
2082        :return: An iterator of inventories.
2083        """
2084        if ((None in revision_ids) or
2085                (_mod_revision.NULL_REVISION in revision_ids)):
2086            raise ValueError('cannot get null revision inventory')
2087        for inv, revid in self._iter_inventories(revision_ids, ordering):
2088            if inv is None:
2089                raise errors.NoSuchRevision(self, revid)
2090            yield inv
2091
2092    def _iter_inventories(self, revision_ids, ordering=None):
2093        if len(revision_ids) == 0:
2094            return
2095        missing = set(revision_ids)
2096        if ordering is None:
2097            order_as_requested = True
2098            invs = {}
2099            order = list(revision_ids)
2100            order.reverse()
2101            next_revid = order.pop()
2102        else:
2103            order_as_requested = False
2104            if ordering != 'unordered' and self._fallback_repositories:
2105                raise ValueError('unsupported ordering %r' % ordering)
2106        iter_inv_fns = [self._iter_inventories_rpc] + [
2107            fallback._iter_inventories for fallback in
2108            self._fallback_repositories]
2109        try:
2110            for iter_inv in iter_inv_fns:
2111                request = [revid for revid in revision_ids if revid in missing]
2112                for inv, revid in iter_inv(request, ordering):
2113                    if inv is None:
2114                        continue
2115                    missing.remove(inv.revision_id)
2116                    if ordering != 'unordered':
2117                        invs[revid] = inv
2118                    else:
2119                        yield inv, revid
2120                if order_as_requested:
2121                    # Yield as many results as we can while preserving order.
2122                    while next_revid in invs:
2123                        inv = invs.pop(next_revid)
2124                        yield inv, inv.revision_id
2125                        try:
2126                            next_revid = order.pop()
2127                        except IndexError:
2128                            # We still want to fully consume the stream, just
2129                            # in case it is not actually finished at this point
2130                            next_revid = None
2131                            break
2132        except errors.UnknownSmartMethod:
2133            for inv, revid in self._iter_inventories_vfs(revision_ids, ordering):
2134                yield inv, revid
2135            return
2136        # Report missing
2137        if order_as_requested:
2138            if next_revid is not None:
2139                yield None, next_revid
2140            while order:
2141                revid = order.pop()
2142                yield invs.get(revid), revid
2143        else:
2144            while missing:
2145                yield None, missing.pop()
2146
2147    def get_revision(self, revision_id):
2148        with self.lock_read():
2149            return self.get_revisions([revision_id])[0]
2150
2151    def get_transaction(self):
2152        self._ensure_real()
2153        return self._real_repository.get_transaction()
2154
2155    def clone(self, a_controldir, revision_id=None):
2156        with self.lock_read():
2157            dest_repo = self._create_sprouting_repo(
2158                a_controldir, shared=self.is_shared())
2159            self.copy_content_into(dest_repo, revision_id)
2160            return dest_repo
2161
2162    def make_working_trees(self):
2163        """See Repository.make_working_trees"""
2164        path = self.controldir._path_for_remote_call(self._client)
2165        try:
2166            response = self._call(b'Repository.make_working_trees', path)
2167        except errors.UnknownSmartMethod:
2168            self._ensure_real()
2169            return self._real_repository.make_working_trees()
2170        if response[0] not in (b'yes', b'no'):
2171            raise SmartProtocolError(
2172                'unexpected response code %s' % (response,))
2173        return response[0] == b'yes'
2174
2175    def refresh_data(self):
2176        """Re-read any data needed to synchronise with disk.
2177
2178        This method is intended to be called after another repository instance
2179        (such as one used by a smart server) has inserted data into the
2180        repository. On all repositories this will work outside of write groups.
2181        Some repository formats (pack and newer for breezy native formats)
2182        support refresh_data inside write groups. If called inside a write
2183        group on a repository that does not support refreshing in a write group
2184        IsInWriteGroupError will be raised.
2185        """
2186        if self._real_repository is not None:
2187            self._real_repository.refresh_data()
2188        # Refresh the parents cache for this object
2189        self._unstacked_provider.disable_cache()
2190        self._unstacked_provider.enable_cache()
2191
2192    def revision_ids_to_search_result(self, result_set):
2193        """Convert a set of revision ids to a graph SearchResult."""
2194        result_parents = set()
2195        for parents in self.get_graph().get_parent_map(result_set).values():
2196            result_parents.update(parents)
2197        included_keys = result_set.intersection(result_parents)
2198        start_keys = result_set.difference(included_keys)
2199        exclude_keys = result_parents.difference(result_set)
2200        result = vf_search.SearchResult(start_keys, exclude_keys,
2201                                        len(result_set), result_set)
2202        return result
2203
2204    def search_missing_revision_ids(self, other,
2205                                    find_ghosts=True, revision_ids=None, if_present_ids=None,
2206                                    limit=None):
2207        """Return the revision ids that other has that this does not.
2208
2209        These are returned in topological order.
2210
2211        revision_id: only return revision ids included by revision_id.
2212        """
2213        with self.lock_read():
2214            inter_repo = _mod_repository.InterRepository.get(other, self)
2215            return inter_repo.search_missing_revision_ids(
2216                find_ghosts=find_ghosts, revision_ids=revision_ids,
2217                if_present_ids=if_present_ids, limit=limit)
2218
2219    def fetch(self, source, revision_id=None, find_ghosts=False,
2220              fetch_spec=None, lossy=False):
2221        # No base implementation to use as RemoteRepository is not a subclass
2222        # of Repository; so this is a copy of Repository.fetch().
2223        if fetch_spec is not None and revision_id is not None:
2224            raise AssertionError(
2225                "fetch_spec and revision_id are mutually exclusive.")
2226        if self.is_in_write_group():
2227            raise errors.InternalBzrError(
2228                "May not fetch while in a write group.")
2229        # fast path same-url fetch operations
2230        if (self.has_same_location(source) and
2231            fetch_spec is None and
2232                self._has_same_fallbacks(source)):
2233            # check that last_revision is in 'from' and then return a
2234            # no-operation.
2235            if (revision_id is not None
2236                    and not _mod_revision.is_null(revision_id)):
2237                self.get_revision(revision_id)
2238            return _mod_repository.FetchResult(0)
2239        # if there is no specific appropriate InterRepository, this will get
2240        # the InterRepository base class, which raises an
2241        # IncompatibleRepositories when asked to fetch.
2242        inter = _mod_repository.InterRepository.get(source, self)
2243        if (fetch_spec is not None
2244                and not getattr(inter, "supports_fetch_spec", False)):
2245            raise errors.UnsupportedOperation(
2246                "fetch_spec not supported for %r" % inter)
2247        return inter.fetch(revision_id=revision_id,
2248                           find_ghosts=find_ghosts, fetch_spec=fetch_spec,
2249                           lossy=lossy)
2250
2251    def create_bundle(self, target, base, fileobj, format=None):
2252        self._ensure_real()
2253        self._real_repository.create_bundle(target, base, fileobj, format)
2254
2255    def fileids_altered_by_revision_ids(self, revision_ids):
2256        self._ensure_real()
2257        return self._real_repository.fileids_altered_by_revision_ids(revision_ids)
2258
2259    def _get_versioned_file_checker(self, revisions, revision_versions_cache):
2260        self._ensure_real()
2261        return self._real_repository._get_versioned_file_checker(
2262            revisions, revision_versions_cache)
2263
2264    def _iter_files_bytes_rpc(self, desired_files, absent):
2265        path = self.controldir._path_for_remote_call(self._client)
2266        lines = []
2267        identifiers = []
2268        for (file_id, revid, identifier) in desired_files:
2269            lines.append(b''.join([
2270                file_id,
2271                b'\0',
2272                revid]))
2273            identifiers.append(identifier)
2274        (response_tuple, response_handler) = (
2275            self._call_with_body_bytes_expecting_body(
2276                b"Repository.iter_files_bytes", (path, ), b"\n".join(lines)))
2277        if response_tuple != (b'ok', ):
2278            response_handler.cancel_read_body()
2279            raise errors.UnexpectedSmartServerResponse(response_tuple)
2280        byte_stream = response_handler.read_streamed_body()
2281
2282        def decompress_stream(start, byte_stream, unused):
2283            decompressor = zlib.decompressobj()
2284            yield decompressor.decompress(start)
2285            while decompressor.unused_data == b"":
2286                try:
2287                    data = next(byte_stream)
2288                except StopIteration:
2289                    break
2290                yield decompressor.decompress(data)
2291            yield decompressor.flush()
2292            unused.append(decompressor.unused_data)
2293        unused = b""
2294        while True:
2295            while b"\n" not in unused:
2296                try:
2297                    unused += next(byte_stream)
2298                except StopIteration:
2299                    return
2300            header, rest = unused.split(b"\n", 1)
2301            args = header.split(b"\0")
2302            if args[0] == b"absent":
2303                absent[identifiers[int(args[3])]] = (args[1], args[2])
2304                unused = rest
2305                continue
2306            elif args[0] == b"ok":
2307                idx = int(args[1])
2308            else:
2309                raise errors.UnexpectedSmartServerResponse(args)
2310            unused_chunks = []
2311            yield (identifiers[idx],
2312                   decompress_stream(rest, byte_stream, unused_chunks))
2313            unused = b"".join(unused_chunks)
2314
2315    def iter_files_bytes(self, desired_files):
2316        """See Repository.iter_file_bytes.
2317        """
2318        try:
2319            absent = {}
2320            for (identifier, bytes_iterator) in self._iter_files_bytes_rpc(
2321                    desired_files, absent):
2322                yield identifier, bytes_iterator
2323            for fallback in self._fallback_repositories:
2324                if not absent:
2325                    break
2326                desired_files = [(key[0], key[1], identifier)
2327                                 for identifier, key in absent.items()]
2328                for (identifier, bytes_iterator) in fallback.iter_files_bytes(desired_files):
2329                    del absent[identifier]
2330                    yield identifier, bytes_iterator
2331            if absent:
2332                # There may be more missing items, but raise an exception
2333                # for just one.
2334                missing_identifier = next(iter(absent))
2335                missing_key = absent[missing_identifier]
2336                raise errors.RevisionNotPresent(revision_id=missing_key[1],
2337                                                file_id=missing_key[0])
2338        except errors.UnknownSmartMethod:
2339            self._ensure_real()
2340            for (identifier, bytes_iterator) in (
2341                    self._real_repository.iter_files_bytes(desired_files)):
2342                yield identifier, bytes_iterator
2343
2344    def get_cached_parent_map(self, revision_ids):
2345        """See breezy.CachingParentsProvider.get_cached_parent_map"""
2346        return self._unstacked_provider.get_cached_parent_map(revision_ids)
2347
2348    def get_parent_map(self, revision_ids):
2349        """See breezy.Graph.get_parent_map()."""
2350        return self._make_parents_provider().get_parent_map(revision_ids)
2351
2352    def _get_parent_map_rpc(self, keys):
2353        """Helper for get_parent_map that performs the RPC."""
2354        medium = self._client._medium
2355        if medium._is_remote_before((1, 2)):
2356            # We already found out that the server can't understand
2357            # Repository.get_parent_map requests, so just fetch the whole
2358            # graph.
2359            #
2360            # Note that this reads the whole graph, when only some keys are
2361            # wanted.  On this old server there's no way (?) to get them all
2362            # in one go, and the user probably will have seen a warning about
2363            # the server being old anyhow.
2364            rg = self._get_revision_graph(None)
2365            # There is an API discrepancy between get_parent_map and
2366            # get_revision_graph. Specifically, a "key:()" pair in
2367            # get_revision_graph just means a node has no parents. For
2368            # "get_parent_map" it means the node is a ghost. So fix up the
2369            # graph to correct this.
2370            #   https://bugs.launchpad.net/bzr/+bug/214894
2371            # There is one other "bug" which is that ghosts in
2372            # get_revision_graph() are not returned at all. But we won't worry
2373            # about that for now.
2374            for node_id, parent_ids in rg.items():
2375                if parent_ids == ():
2376                    rg[node_id] = (NULL_REVISION,)
2377            rg[NULL_REVISION] = ()
2378            return rg
2379
2380        keys = set(keys)
2381        if None in keys:
2382            raise ValueError('get_parent_map(None) is not valid')
2383        if NULL_REVISION in keys:
2384            keys.discard(NULL_REVISION)
2385            found_parents = {NULL_REVISION: ()}
2386            if not keys:
2387                return found_parents
2388        else:
2389            found_parents = {}
2390        # TODO(Needs analysis): We could assume that the keys being requested
2391        # from get_parent_map are in a breadth first search, so typically they
2392        # will all be depth N from some common parent, and we don't have to
2393        # have the server iterate from the root parent, but rather from the
2394        # keys we're searching; and just tell the server the keyspace we
2395        # already have; but this may be more traffic again.
2396
2397        # Transform self._parents_map into a search request recipe.
2398        # TODO: Manage this incrementally to avoid covering the same path
2399        # repeatedly. (The server will have to on each request, but the less
2400        # work done the better).
2401        #
2402        # Negative caching notes:
2403        # new server sends missing when a request including the revid
2404        # 'include-missing:' is present in the request.
2405        # missing keys are serialised as missing:X, and we then call
2406        # provider.note_missing(X) for-all X
2407        parents_map = self._unstacked_provider.get_cached_map()
2408        if parents_map is None:
2409            # Repository is not locked, so there's no cache.
2410            parents_map = {}
2411        if _DEFAULT_SEARCH_DEPTH <= 0:
2412            (start_set, stop_keys,
2413             key_count) = vf_search.search_result_from_parent_map(
2414                parents_map, self._unstacked_provider.missing_keys)
2415        else:
2416            (start_set, stop_keys,
2417             key_count) = vf_search.limited_search_result_from_parent_map(
2418                parents_map, self._unstacked_provider.missing_keys,
2419                keys, depth=_DEFAULT_SEARCH_DEPTH)
2420        recipe = ('manual', start_set, stop_keys, key_count)
2421        body = self._serialise_search_recipe(recipe)
2422        path = self.controldir._path_for_remote_call(self._client)
2423        for key in keys:
2424            if not isinstance(key, bytes):
2425                raise ValueError(
2426                    "key %r not a bytes string" % (key,))
2427        verb = b'Repository.get_parent_map'
2428        args = (path, b'include-missing:') + tuple(keys)
2429        try:
2430            response = self._call_with_body_bytes_expecting_body(
2431                verb, args, body)
2432        except errors.UnknownSmartMethod:
2433            # Server does not support this method, so get the whole graph.
2434            # Worse, we have to force a disconnection, because the server now
2435            # doesn't realise it has a body on the wire to consume, so the
2436            # only way to recover is to abandon the connection.
2437            warning(
2438                'Server is too old for fast get_parent_map, reconnecting.  '
2439                '(Upgrade the server to Bazaar 1.2 to avoid this)')
2440            medium.disconnect()
2441            # To avoid having to disconnect repeatedly, we keep track of the
2442            # fact the server doesn't understand remote methods added in 1.2.
2443            medium._remember_remote_is_before((1, 2))
2444            # Recurse just once and we should use the fallback code.
2445            return self._get_parent_map_rpc(keys)
2446        response_tuple, response_handler = response
2447        if response_tuple[0] not in [b'ok']:
2448            response_handler.cancel_read_body()
2449            raise errors.UnexpectedSmartServerResponse(response_tuple)
2450        if response_tuple[0] == b'ok':
2451            coded = bz2.decompress(response_handler.read_body_bytes())
2452            if coded == b'':
2453                # no revisions found
2454                return {}
2455            lines = coded.split(b'\n')
2456            revision_graph = {}
2457            for line in lines:
2458                d = tuple(line.split())
2459                if len(d) > 1:
2460                    revision_graph[d[0]] = d[1:]
2461                else:
2462                    # No parents:
2463                    if d[0].startswith(b'missing:'):
2464                        revid = d[0][8:]
2465                        self._unstacked_provider.note_missing_key(revid)
2466                    else:
2467                        # no parents - so give the Graph result
2468                        # (NULL_REVISION,).
2469                        revision_graph[d[0]] = (NULL_REVISION,)
2470            return revision_graph
2471
2472    def get_signature_text(self, revision_id):
2473        with self.lock_read():
2474            path = self.controldir._path_for_remote_call(self._client)
2475            try:
2476                response_tuple, response_handler = self._call_expecting_body(
2477                    b'Repository.get_revision_signature_text', path, revision_id)
2478            except errors.UnknownSmartMethod:
2479                self._ensure_real()
2480                return self._real_repository.get_signature_text(revision_id)
2481            except errors.NoSuchRevision as err:
2482                for fallback in self._fallback_repositories:
2483                    try:
2484                        return fallback.get_signature_text(revision_id)
2485                    except errors.NoSuchRevision:
2486                        pass
2487                raise err
2488            else:
2489                if response_tuple[0] != b'ok':
2490                    raise errors.UnexpectedSmartServerResponse(response_tuple)
2491                return response_handler.read_body_bytes()
2492
2493    def _get_inventory_xml(self, revision_id):
2494        with self.lock_read():
2495            # This call is used by older working tree formats,
2496            # which stored a serialized basis inventory.
2497            self._ensure_real()
2498            return self._real_repository._get_inventory_xml(revision_id)
2499
2500    def reconcile(self, other=None, thorough=False):
2501        from ..reconcile import ReconcileResult
2502        with self.lock_write():
2503            path = self.controldir._path_for_remote_call(self._client)
2504            try:
2505                response, handler = self._call_expecting_body(
2506                    b'Repository.reconcile', path, self._lock_token)
2507            except (errors.UnknownSmartMethod, errors.TokenLockingNotSupported):
2508                self._ensure_real()
2509                return self._real_repository.reconcile(other=other, thorough=thorough)
2510            if response != (b'ok', ):
2511                raise errors.UnexpectedSmartServerResponse(response)
2512            body = handler.read_body_bytes()
2513            result = ReconcileResult()
2514            result.garbage_inventories = None
2515            result.inconsistent_parents = None
2516            result.aborted = None
2517            for line in body.split(b'\n'):
2518                if not line:
2519                    continue
2520                key, val_text = line.split(b':')
2521                if key == b"garbage_inventories":
2522                    result.garbage_inventories = int(val_text)
2523                elif key == b"inconsistent_parents":
2524                    result.inconsistent_parents = int(val_text)
2525                else:
2526                    mutter("unknown reconcile key %r" % key)
2527            return result
2528
2529    def all_revision_ids(self):
2530        path = self.controldir._path_for_remote_call(self._client)
2531        try:
2532            response_tuple, response_handler = self._call_expecting_body(
2533                b"Repository.all_revision_ids", path)
2534        except errors.UnknownSmartMethod:
2535            self._ensure_real()
2536            return self._real_repository.all_revision_ids()
2537        if response_tuple != (b"ok", ):
2538            raise errors.UnexpectedSmartServerResponse(response_tuple)
2539        revids = set(response_handler.read_body_bytes().splitlines())
2540        for fallback in self._fallback_repositories:
2541            revids.update(set(fallback.all_revision_ids()))
2542        return list(revids)
2543
2544    def _filtered_revision_trees(self, revision_ids, file_ids):
2545        """Return Tree for a revision on this branch with only some files.
2546
2547        :param revision_ids: a sequence of revision-ids;
2548          a revision-id may not be None or b'null:'
2549        :param file_ids: if not None, the result is filtered
2550          so that only those file-ids, their parents and their
2551          children are included.
2552        """
2553        inventories = self.iter_inventories(revision_ids)
2554        for inv in inventories:
2555            # Should we introduce a FilteredRevisionTree class rather
2556            # than pre-filter the inventory here?
2557            filtered_inv = inv.filter(file_ids)
2558            yield InventoryRevisionTree(self, filtered_inv, filtered_inv.revision_id)
2559
2560    def get_revision_delta(self, revision_id):
2561        with self.lock_read():
2562            r = self.get_revision(revision_id)
2563            return list(self.get_revision_deltas([r]))[0]
2564
2565    def revision_trees(self, revision_ids):
2566        with self.lock_read():
2567            inventories = self.iter_inventories(revision_ids)
2568            for inv in inventories:
2569                yield RemoteInventoryTree(self, inv, inv.revision_id)
2570
2571    def get_revision_reconcile(self, revision_id):
2572        with self.lock_read():
2573            self._ensure_real()
2574            return self._real_repository.get_revision_reconcile(revision_id)
2575
2576    def check(self, revision_ids=None, callback_refs=None, check_repo=True):
2577        with self.lock_read():
2578            self._ensure_real()
2579            return self._real_repository.check(revision_ids=revision_ids,
2580                                               callback_refs=callback_refs, check_repo=check_repo)
2581
2582    def copy_content_into(self, destination, revision_id=None):
2583        """Make a complete copy of the content in self into destination.
2584
2585        This is a destructive operation! Do not use it on existing
2586        repositories.
2587        """
2588        interrepo = _mod_repository.InterRepository.get(self, destination)
2589        return interrepo.copy_content(revision_id)
2590
2591    def _copy_repository_tarball(self, to_bzrdir, revision_id=None):
2592        # get a tarball of the remote repository, and copy from that into the
2593        # destination
2594        import tarfile
2595        # TODO: Maybe a progress bar while streaming the tarball?
2596        note(gettext("Copying repository content as tarball..."))
2597        tar_file = self._get_tarball('bz2')
2598        if tar_file is None:
2599            return None
2600        destination = to_bzrdir.create_repository()
2601        try:
2602            tar = tarfile.open('repository', fileobj=tar_file,
2603                               mode='r|bz2')
2604            tmpdir = osutils.mkdtemp()
2605            try:
2606                tar.extractall(tmpdir)
2607                tmp_bzrdir = _mod_bzrdir.BzrDir.open(tmpdir)
2608                tmp_repo = tmp_bzrdir.open_repository()
2609                tmp_repo.copy_content_into(destination, revision_id)
2610            finally:
2611                osutils.rmtree(tmpdir)
2612        finally:
2613            tar_file.close()
2614        return destination
2615        # TODO: Suggestion from john: using external tar is much faster than
2616        # python's tarfile library, but it may not work on windows.
2617
2618    @property
2619    def inventories(self):
2620        """Decorate the real repository for now.
2621
2622        In the long term a full blown network facility is needed to
2623        avoid creating a real repository object locally.
2624        """
2625        self._ensure_real()
2626        return self._real_repository.inventories
2627
2628    def pack(self, hint=None, clean_obsolete_packs=False):
2629        """Compress the data within the repository.
2630        """
2631        if hint is None:
2632            body = b""
2633        else:
2634            body = b"".join([l.encode('ascii') + b"\n" for l in hint])
2635        with self.lock_write():
2636            path = self.controldir._path_for_remote_call(self._client)
2637            try:
2638                response, handler = self._call_with_body_bytes_expecting_body(
2639                    b'Repository.pack', (path, self._lock_token,
2640                                         str(clean_obsolete_packs).encode('ascii')), body)
2641            except errors.UnknownSmartMethod:
2642                self._ensure_real()
2643                return self._real_repository.pack(hint=hint,
2644                                                  clean_obsolete_packs=clean_obsolete_packs)
2645            handler.cancel_read_body()
2646            if response != (b'ok', ):
2647                raise errors.UnexpectedSmartServerResponse(response)
2648
2649    @property
2650    def revisions(self):
2651        """Decorate the real repository for now.
2652
2653        In the long term a full blown network facility is needed.
2654        """
2655        self._ensure_real()
2656        return self._real_repository.revisions
2657
2658    def set_make_working_trees(self, new_value):
2659        if new_value:
2660            new_value_str = b"True"
2661        else:
2662            new_value_str = b"False"
2663        path = self.controldir._path_for_remote_call(self._client)
2664        try:
2665            response = self._call(
2666                b'Repository.set_make_working_trees', path, new_value_str)
2667        except errors.UnknownSmartMethod:
2668            self._ensure_real()
2669            self._real_repository.set_make_working_trees(new_value)
2670        else:
2671            if response[0] != b'ok':
2672                raise errors.UnexpectedSmartServerResponse(response)
2673
2674    @property
2675    def signatures(self):
2676        """Decorate the real repository for now.
2677
2678        In the long term a full blown network facility is needed to avoid
2679        creating a real repository object locally.
2680        """
2681        self._ensure_real()
2682        return self._real_repository.signatures
2683
2684    def sign_revision(self, revision_id, gpg_strategy):
2685        with self.lock_write():
2686            testament = _mod_testament.Testament.from_revision(
2687                self, revision_id)
2688            plaintext = testament.as_short_text()
2689            self.store_revision_signature(gpg_strategy, plaintext, revision_id)
2690
2691    @property
2692    def texts(self):
2693        """Decorate the real repository for now.
2694
2695        In the long term a full blown network facility is needed to avoid
2696        creating a real repository object locally.
2697        """
2698        self._ensure_real()
2699        return self._real_repository.texts
2700
2701    def _iter_revisions_rpc(self, revision_ids):
2702        body = b"\n".join(revision_ids)
2703        path = self.controldir._path_for_remote_call(self._client)
2704        response_tuple, response_handler = (
2705            self._call_with_body_bytes_expecting_body(
2706                b"Repository.iter_revisions", (path, ), body))
2707        if response_tuple[0] != b"ok":
2708            raise errors.UnexpectedSmartServerResponse(response_tuple)
2709        serializer_format = response_tuple[1].decode('ascii')
2710        serializer = serializer_format_registry.get(serializer_format)
2711        byte_stream = response_handler.read_streamed_body()
2712        decompressor = zlib.decompressobj()
2713        chunks = []
2714        for bytes in byte_stream:
2715            chunks.append(decompressor.decompress(bytes))
2716            if decompressor.unused_data != b"":
2717                chunks.append(decompressor.flush())
2718                yield serializer.read_revision_from_string(b"".join(chunks))
2719                unused = decompressor.unused_data
2720                decompressor = zlib.decompressobj()
2721                chunks = [decompressor.decompress(unused)]
2722        chunks.append(decompressor.flush())
2723        text = b"".join(chunks)
2724        if text != b"":
2725            yield serializer.read_revision_from_string(b"".join(chunks))
2726
2727    def iter_revisions(self, revision_ids):
2728        for rev_id in revision_ids:
2729            if not rev_id or not isinstance(rev_id, bytes):
2730                raise errors.InvalidRevisionId(
2731                    revision_id=rev_id, branch=self)
2732        with self.lock_read():
2733            try:
2734                missing = set(revision_ids)
2735                for rev in self._iter_revisions_rpc(revision_ids):
2736                    missing.remove(rev.revision_id)
2737                    yield (rev.revision_id, rev)
2738                for fallback in self._fallback_repositories:
2739                    if not missing:
2740                        break
2741                    for (revid, rev) in fallback.iter_revisions(missing):
2742                        if rev is not None:
2743                            yield (revid, rev)
2744                            missing.remove(revid)
2745                for revid in missing:
2746                    yield (revid, None)
2747            except errors.UnknownSmartMethod:
2748                self._ensure_real()
2749                for entry in self._real_repository.iter_revisions(revision_ids):
2750                    yield entry
2751
2752    def supports_rich_root(self):
2753        return self._format.rich_root_data
2754
2755    @property
2756    def _serializer(self):
2757        return self._format._serializer
2758
2759    def store_revision_signature(self, gpg_strategy, plaintext, revision_id):
2760        with self.lock_write():
2761            signature = gpg_strategy.sign(plaintext, gpg.MODE_CLEAR)
2762            self.add_signature_text(revision_id, signature)
2763
2764    def add_signature_text(self, revision_id, signature):
2765        if self._real_repository:
2766            # If there is a real repository the write group will
2767            # be in the real repository as well, so use that:
2768            self._ensure_real()
2769            return self._real_repository.add_signature_text(
2770                revision_id, signature)
2771        path = self.controldir._path_for_remote_call(self._client)
2772        response, handler = self._call_with_body_bytes_expecting_body(
2773            b'Repository.add_signature_text', (path, self._lock_token,
2774                                               revision_id) +
2775            tuple([token.encode('utf-8')
2776                   for token in self._write_group_tokens]),
2777            signature)
2778        handler.cancel_read_body()
2779        self.refresh_data()
2780        if response[0] != b'ok':
2781            raise errors.UnexpectedSmartServerResponse(response)
2782        self._write_group_tokens = [token.decode(
2783            'utf-8') for token in response[1:]]
2784
2785    def has_signature_for_revision_id(self, revision_id):
2786        path = self.controldir._path_for_remote_call(self._client)
2787        try:
2788            response = self._call(b'Repository.has_signature_for_revision_id',
2789                                  path, revision_id)
2790        except errors.UnknownSmartMethod:
2791            self._ensure_real()
2792            return self._real_repository.has_signature_for_revision_id(
2793                revision_id)
2794        if response[0] not in (b'yes', b'no'):
2795            raise SmartProtocolError(
2796                'unexpected response code %s' % (response,))
2797        if response[0] == b'yes':
2798            return True
2799        for fallback in self._fallback_repositories:
2800            if fallback.has_signature_for_revision_id(revision_id):
2801                return True
2802        return False
2803
2804    def verify_revision_signature(self, revision_id, gpg_strategy):
2805        with self.lock_read():
2806            if not self.has_signature_for_revision_id(revision_id):
2807                return gpg.SIGNATURE_NOT_SIGNED, None
2808            signature = self.get_signature_text(revision_id)
2809
2810            testament = _mod_testament.Testament.from_revision(
2811                self, revision_id)
2812
2813            (status, key, signed_plaintext) = gpg_strategy.verify(signature)
2814            if testament.as_short_text() != signed_plaintext:
2815                return gpg.SIGNATURE_NOT_VALID, None
2816            return (status, key)
2817
2818    def item_keys_introduced_by(self, revision_ids, _files_pb=None):
2819        self._ensure_real()
2820        return self._real_repository.item_keys_introduced_by(revision_ids,
2821                                                             _files_pb=_files_pb)
2822
2823    def _find_inconsistent_revision_parents(self, revisions_iterator=None):
2824        self._ensure_real()
2825        return self._real_repository._find_inconsistent_revision_parents(
2826            revisions_iterator)
2827
2828    def _check_for_inconsistent_revision_parents(self):
2829        self._ensure_real()
2830        return self._real_repository._check_for_inconsistent_revision_parents()
2831
2832    def _make_parents_provider(self, other=None):
2833        providers = [self._unstacked_provider]
2834        if other is not None:
2835            providers.insert(0, other)
2836        return graph.StackedParentsProvider(_LazyListJoin(
2837            providers, self._fallback_repositories))
2838
2839    def _serialise_search_recipe(self, recipe):
2840        """Serialise a graph search recipe.
2841
2842        :param recipe: A search recipe (start, stop, count).
2843        :return: Serialised bytes.
2844        """
2845        start_keys = b' '.join(recipe[1])
2846        stop_keys = b' '.join(recipe[2])
2847        count = str(recipe[3]).encode('ascii')
2848        return b'\n'.join((start_keys, stop_keys, count))
2849
2850    def _serialise_search_result(self, search_result):
2851        parts = search_result.get_network_struct()
2852        return b'\n'.join(parts)
2853
2854    def autopack(self):
2855        path = self.controldir._path_for_remote_call(self._client)
2856        try:
2857            response = self._call(b'PackRepository.autopack', path)
2858        except errors.UnknownSmartMethod:
2859            self._ensure_real()
2860            self._real_repository._pack_collection.autopack()
2861            return
2862        self.refresh_data()
2863        if response[0] != b'ok':
2864            raise errors.UnexpectedSmartServerResponse(response)
2865
2866    def _revision_archive(self, revision_id, format, name, root, subdir,
2867                          force_mtime=None):
2868        path = self.controldir._path_for_remote_call(self._client)
2869        format = format or ''
2870        root = root or ''
2871        subdir = subdir or ''
2872        force_mtime = int(force_mtime) if force_mtime is not None else None
2873        try:
2874            response, protocol = self._call_expecting_body(
2875                b'Repository.revision_archive', path,
2876                revision_id,
2877                format.encode('ascii'),
2878                os.path.basename(name).encode('utf-8'),
2879                root.encode('utf-8'),
2880                subdir.encode('utf-8'),
2881                force_mtime)
2882        except errors.UnknownSmartMethod:
2883            return None
2884        if response[0] == b'ok':
2885            return iter([protocol.read_body_bytes()])
2886        raise errors.UnexpectedSmartServerResponse(response)
2887
2888    def _annotate_file_revision(self, revid, tree_path, file_id, default_revision):
2889        path = self.controldir._path_for_remote_call(self._client)
2890        tree_path = tree_path.encode('utf-8')
2891        file_id = file_id or b''
2892        default_revision = default_revision or b''
2893        try:
2894            response, handler = self._call_expecting_body(
2895                b'Repository.annotate_file_revision', path,
2896                revid, tree_path, file_id, default_revision)
2897        except errors.UnknownSmartMethod:
2898            return None
2899        if response[0] != b'ok':
2900            raise errors.UnexpectedSmartServerResponse(response)
2901        return map(tuple, bencode.bdecode(handler.read_body_bytes()))
2902
2903
2904class RemoteStreamSink(vf_repository.StreamSink):
2905
2906    def _insert_real(self, stream, src_format, resume_tokens):
2907        self.target_repo._ensure_real()
2908        sink = self.target_repo._real_repository._get_sink()
2909        result = sink.insert_stream(stream, src_format, resume_tokens)
2910        if not result:
2911            self.target_repo.autopack()
2912        return result
2913
2914    def insert_missing_keys(self, source, missing_keys):
2915        if (isinstance(source, RemoteStreamSource)
2916                and source.from_repository._client._medium == self.target_repo._client._medium):
2917            # Streaming from and to the same medium is tricky, since we don't support
2918            # more than one concurrent request. For now, just force VFS.
2919            stream = source._get_real_stream_for_missing_keys(missing_keys)
2920        else:
2921            stream = source.get_stream_for_missing_keys(missing_keys)
2922        return self.insert_stream_without_locking(stream,
2923                                                  self.target_repo._format)
2924
2925    def insert_stream(self, stream, src_format, resume_tokens):
2926        target = self.target_repo
2927        target._unstacked_provider.missing_keys.clear()
2928        candidate_calls = [(b'Repository.insert_stream_1.19', (1, 19))]
2929        if target._lock_token:
2930            candidate_calls.append(
2931                (b'Repository.insert_stream_locked', (1, 14)))
2932            lock_args = (target._lock_token or b'',)
2933        else:
2934            candidate_calls.append((b'Repository.insert_stream', (1, 13)))
2935            lock_args = ()
2936        client = target._client
2937        medium = client._medium
2938        path = target.controldir._path_for_remote_call(client)
2939        # Probe for the verb to use with an empty stream before sending the
2940        # real stream to it.  We do this both to avoid the risk of sending a
2941        # large request that is then rejected, and because we don't want to
2942        # implement a way to buffer, rewind, or restart the stream.
2943        found_verb = False
2944        for verb, required_version in candidate_calls:
2945            if medium._is_remote_before(required_version):
2946                continue
2947            if resume_tokens:
2948                # We've already done the probing (and set _is_remote_before) on
2949                # a previous insert.
2950                found_verb = True
2951                break
2952            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
2953            try:
2954                response = client.call_with_body_stream(
2955                    (verb, path, b'') + lock_args, byte_stream)
2956            except errors.UnknownSmartMethod:
2957                medium._remember_remote_is_before(required_version)
2958            else:
2959                found_verb = True
2960                break
2961        if not found_verb:
2962            # Have to use VFS.
2963            return self._insert_real(stream, src_format, resume_tokens)
2964        self._last_inv_record = None
2965        self._last_substream = None
2966        if required_version < (1, 19):
2967            # Remote side doesn't support inventory deltas.  Wrap the stream to
2968            # make sure we don't send any.  If the stream contains inventory
2969            # deltas we'll interrupt the smart insert_stream request and
2970            # fallback to VFS.
2971            stream = self._stop_stream_if_inventory_delta(stream)
2972        byte_stream = smart_repo._stream_to_byte_stream(
2973            stream, src_format)
2974        resume_tokens = b' '.join([token.encode('utf-8')
2975                                   for token in resume_tokens])
2976        response = client.call_with_body_stream(
2977            (verb, path, resume_tokens) + lock_args, byte_stream)
2978        if response[0][0] not in (b'ok', b'missing-basis'):
2979            raise errors.UnexpectedSmartServerResponse(response)
2980        if self._last_substream is not None:
2981            # The stream included an inventory-delta record, but the remote
2982            # side isn't new enough to support them.  So we need to send the
2983            # rest of the stream via VFS.
2984            self.target_repo.refresh_data()
2985            return self._resume_stream_with_vfs(response, src_format)
2986        if response[0][0] == b'missing-basis':
2987            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
2988            resume_tokens = [token.decode('utf-8') for token in tokens]
2989            return resume_tokens, set((entry[0].decode('utf-8'), ) + entry[1:] for entry in missing_keys)
2990        else:
2991            self.target_repo.refresh_data()
2992            return [], set()
2993
2994    def _resume_stream_with_vfs(self, response, src_format):
2995        """Resume sending a stream via VFS, first resending the record and
2996        substream that couldn't be sent via an insert_stream verb.
2997        """
2998        if response[0][0] == b'missing-basis':
2999            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
3000            tokens = [token.decode('utf-8') for token in tokens]
3001            # Ignore missing_keys, we haven't finished inserting yet
3002        else:
3003            tokens = []
3004
3005        def resume_substream():
3006            # Yield the substream that was interrupted.
3007            for record in self._last_substream:
3008                yield record
3009            self._last_substream = None
3010
3011        def resume_stream():
3012            # Finish sending the interrupted substream
3013            yield ('inventory-deltas', resume_substream())
3014            # Then simply continue sending the rest of the stream.
3015            for substream_kind, substream in self._last_stream:
3016                yield substream_kind, substream
3017        return self._insert_real(resume_stream(), src_format, tokens)
3018
3019    def _stop_stream_if_inventory_delta(self, stream):
3020        """Normally this just lets the original stream pass-through unchanged.
3021
3022        However if any 'inventory-deltas' substream occurs it will stop
3023        streaming, and store the interrupted substream and stream in
3024        self._last_substream and self._last_stream so that the stream can be
3025        resumed by _resume_stream_with_vfs.
3026        """
3027
3028        stream_iter = iter(stream)
3029        for substream_kind, substream in stream_iter:
3030            if substream_kind == 'inventory-deltas':
3031                self._last_substream = substream
3032                self._last_stream = stream_iter
3033                return
3034            else:
3035                yield substream_kind, substream
3036
3037
3038class RemoteStreamSource(vf_repository.StreamSource):
3039    """Stream data from a remote server."""
3040
3041    def get_stream(self, search):
3042        if (self.from_repository._fallback_repositories
3043                and self.to_format._fetch_order == 'topological'):
3044            return self._real_stream(self.from_repository, search)
3045        sources = []
3046        seen = set()
3047        repos = [self.from_repository]
3048        while repos:
3049            repo = repos.pop(0)
3050            if repo in seen:
3051                continue
3052            seen.add(repo)
3053            repos.extend(repo._fallback_repositories)
3054            sources.append(repo)
3055        return self.missing_parents_chain(search, sources)
3056
3057    def _get_real_stream_for_missing_keys(self, missing_keys):
3058        self.from_repository._ensure_real()
3059        real_repo = self.from_repository._real_repository
3060        real_source = real_repo._get_source(self.to_format)
3061        return real_source.get_stream_for_missing_keys(missing_keys)
3062
3063    def get_stream_for_missing_keys(self, missing_keys):
3064        if not isinstance(self.from_repository, RemoteRepository):
3065            return self._get_real_stream_for_missing_keys(missing_keys)
3066        client = self.from_repository._client
3067        medium = client._medium
3068        if medium._is_remote_before((3, 0)):
3069            return self._get_real_stream_for_missing_keys(missing_keys)
3070        path = self.from_repository.controldir._path_for_remote_call(client)
3071        args = (path, self.to_format.network_name())
3072        search_bytes = b'\n'.join(
3073            [b'%s\t%s' % (key[0].encode('utf-8'), key[1]) for key in missing_keys])
3074        try:
3075            response, handler = self.from_repository._call_with_body_bytes_expecting_body(
3076                b'Repository.get_stream_for_missing_keys', args, search_bytes)
3077        except (errors.UnknownSmartMethod, errors.UnknownFormatError):
3078            return self._get_real_stream_for_missing_keys(missing_keys)
3079        if response[0] != b'ok':
3080            raise errors.UnexpectedSmartServerResponse(response)
3081        byte_stream = handler.read_streamed_body()
3082        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream,
3083                                                               self._record_counter)
3084        if src_format.network_name() != self.from_repository._format.network_name():
3085            raise AssertionError(
3086                "Mismatched RemoteRepository and stream src %r, %r" % (
3087                    src_format.network_name(), repo._format.network_name()))
3088        return stream
3089
3090    def _real_stream(self, repo, search):
3091        """Get a stream for search from repo.
3092
3093        This never called RemoteStreamSource.get_stream, and is a helper
3094        for RemoteStreamSource._get_stream to allow getting a stream
3095        reliably whether fallback back because of old servers or trying
3096        to stream from a non-RemoteRepository (which the stacked support
3097        code will do).
3098        """
3099        source = repo._get_source(self.to_format)
3100        if isinstance(source, RemoteStreamSource):
3101            repo._ensure_real()
3102            source = repo._real_repository._get_source(self.to_format)
3103        return source.get_stream(search)
3104
3105    def _get_stream(self, repo, search):
3106        """Core worker to get a stream from repo for search.
3107
3108        This is used by both get_stream and the stacking support logic. It
3109        deliberately gets a stream for repo which does not need to be
3110        self.from_repository. In the event that repo is not Remote, or
3111        cannot do a smart stream, a fallback is made to the generic
3112        repository._get_stream() interface, via self._real_stream.
3113
3114        In the event of stacking, streams from _get_stream will not
3115        contain all the data for search - this is normal (see get_stream).
3116
3117        :param repo: A repository.
3118        :param search: A search.
3119        """
3120        # Fallbacks may be non-smart
3121        if not isinstance(repo, RemoteRepository):
3122            return self._real_stream(repo, search)
3123        client = repo._client
3124        medium = client._medium
3125        path = repo.controldir._path_for_remote_call(client)
3126        search_bytes = repo._serialise_search_result(search)
3127        args = (path, self.to_format.network_name())
3128        candidate_verbs = [
3129            (b'Repository.get_stream_1.19', (1, 19)),
3130            (b'Repository.get_stream', (1, 13))]
3131
3132        found_verb = False
3133        for verb, version in candidate_verbs:
3134            if medium._is_remote_before(version):
3135                continue
3136            try:
3137                response = repo._call_with_body_bytes_expecting_body(
3138                    verb, args, search_bytes)
3139            except errors.UnknownSmartMethod:
3140                medium._remember_remote_is_before(version)
3141            except errors.UnknownErrorFromSmartServer as e:
3142                if isinstance(search, vf_search.EverythingResult):
3143                    error_verb = e.error_from_smart_server.error_verb
3144                    if error_verb == b'BadSearch':
3145                        # Pre-2.4 servers don't support this sort of search.
3146                        # XXX: perhaps falling back to VFS on BadSearch is a
3147                        # good idea in general?  It might provide a little bit
3148                        # of protection against client-side bugs.
3149                        medium._remember_remote_is_before((2, 4))
3150                        break
3151                raise
3152            else:
3153                response_tuple, response_handler = response
3154                found_verb = True
3155                break
3156        if not found_verb:
3157            return self._real_stream(repo, search)
3158        if response_tuple[0] != b'ok':
3159            raise errors.UnexpectedSmartServerResponse(response_tuple)
3160        byte_stream = response_handler.read_streamed_body()
3161        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream,
3162                                                               self._record_counter)
3163        if src_format.network_name() != repo._format.network_name():
3164            raise AssertionError(
3165                "Mismatched RemoteRepository and stream src %r, %r" % (
3166                    src_format.network_name(), repo._format.network_name()))
3167        return stream
3168
3169    def missing_parents_chain(self, search, sources):
3170        """Chain multiple streams together to handle stacking.
3171
3172        :param search: The overall search to satisfy with streams.
3173        :param sources: A list of Repository objects to query.
3174        """
3175        self.from_serialiser = self.from_repository._format._serializer
3176        self.seen_revs = set()
3177        self.referenced_revs = set()
3178        # If there are heads in the search, or the key count is > 0, we are not
3179        # done.
3180        while not search.is_empty() and len(sources) > 1:
3181            source = sources.pop(0)
3182            stream = self._get_stream(source, search)
3183            for kind, substream in stream:
3184                if kind != 'revisions':
3185                    yield kind, substream
3186                else:
3187                    yield kind, self.missing_parents_rev_handler(substream)
3188            search = search.refine(self.seen_revs, self.referenced_revs)
3189            self.seen_revs = set()
3190            self.referenced_revs = set()
3191        if not search.is_empty():
3192            for kind, stream in self._get_stream(sources[0], search):
3193                yield kind, stream
3194
3195    def missing_parents_rev_handler(self, substream):
3196        for content in substream:
3197            revision_bytes = content.get_bytes_as('fulltext')
3198            revision = self.from_serialiser.read_revision_from_string(
3199                revision_bytes)
3200            self.seen_revs.add(content.key[-1])
3201            self.referenced_revs.update(revision.parent_ids)
3202            yield content
3203
3204
3205class RemoteBranchLockableFiles(LockableFiles):
3206    """A 'LockableFiles' implementation that talks to a smart server.
3207
3208    This is not a public interface class.
3209    """
3210
3211    def __init__(self, bzrdir, _client):
3212        self.controldir = bzrdir
3213        self._client = _client
3214        self._need_find_modes = True
3215        LockableFiles.__init__(
3216            self, bzrdir.get_branch_transport(None),
3217            'lock', lockdir.LockDir)
3218
3219    def _find_modes(self):
3220        # RemoteBranches don't let the client set the mode of control files.
3221        self._dir_mode = None
3222        self._file_mode = None
3223
3224
3225class RemoteBranchFormat(branch.BranchFormat):
3226
3227    def __init__(self, network_name=None):
3228        super(RemoteBranchFormat, self).__init__()
3229        self._matchingcontroldir = RemoteBzrDirFormat()
3230        self._matchingcontroldir.set_branch_format(self)
3231        self._custom_format = None
3232        self._network_name = network_name
3233
3234    def __eq__(self, other):
3235        return (isinstance(other, RemoteBranchFormat)
3236                and self.__dict__ == other.__dict__)
3237
3238    def _ensure_real(self):
3239        if self._custom_format is None:
3240            try:
3241                self._custom_format = branch.network_format_registry.get(
3242                    self._network_name)
3243            except KeyError:
3244                raise errors.UnknownFormatError(kind='branch',
3245                                                format=self._network_name)
3246
3247    def get_format_description(self):
3248        self._ensure_real()
3249        return 'Remote: ' + self._custom_format.get_format_description()
3250
3251    def network_name(self):
3252        return self._network_name
3253
3254    def open(self, a_controldir, name=None, ignore_fallbacks=False):
3255        return a_controldir.open_branch(name=name,
3256                                        ignore_fallbacks=ignore_fallbacks)
3257
3258    def _vfs_initialize(self, a_controldir, name, append_revisions_only,
3259                        repository=None):
3260        # Initialisation when using a local bzrdir object, or a non-vfs init
3261        # method is not available on the server.
3262        # self._custom_format is always set - the start of initialize ensures
3263        # that.
3264        if isinstance(a_controldir, RemoteBzrDir):
3265            a_controldir._ensure_real()
3266            result = self._custom_format.initialize(a_controldir._real_bzrdir,
3267                                                    name=name, append_revisions_only=append_revisions_only,
3268                                                    repository=repository)
3269        else:
3270            # We assume the bzrdir is parameterised; it may not be.
3271            result = self._custom_format.initialize(a_controldir, name=name,
3272                                                    append_revisions_only=append_revisions_only,
3273                                                    repository=repository)
3274        if (isinstance(a_controldir, RemoteBzrDir)
3275                and not isinstance(result, RemoteBranch)):
3276            result = RemoteBranch(a_controldir, a_controldir.find_repository(), result,
3277                                  name=name)
3278        return result
3279
3280    def initialize(self, a_controldir, name=None, repository=None,
3281                   append_revisions_only=None):
3282        if name is None:
3283            name = a_controldir._get_selected_branch()
3284        # 1) get the network name to use.
3285        if self._custom_format:
3286            network_name = self._custom_format.network_name()
3287        else:
3288            # Select the current breezy default and ask for that.
3289            reference_bzrdir_format = controldir.format_registry.get(
3290                'default')()
3291            reference_format = reference_bzrdir_format.get_branch_format()
3292            self._custom_format = reference_format
3293            network_name = reference_format.network_name()
3294        # Being asked to create on a non RemoteBzrDir:
3295        if not isinstance(a_controldir, RemoteBzrDir):
3296            return self._vfs_initialize(a_controldir, name=name,
3297                                        append_revisions_only=append_revisions_only,
3298                                        repository=repository)
3299        medium = a_controldir._client._medium
3300        if medium._is_remote_before((1, 13)):
3301            return self._vfs_initialize(a_controldir, name=name,
3302                                        append_revisions_only=append_revisions_only,
3303                                        repository=repository)
3304        # Creating on a remote bzr dir.
3305        # 2) try direct creation via RPC
3306        path = a_controldir._path_for_remote_call(a_controldir._client)
3307        if name != "":
3308            # XXX JRV20100304: Support creating colocated branches
3309            raise errors.NoColocatedBranchSupport(self)
3310        verb = b'BzrDir.create_branch'
3311        try:
3312            response = a_controldir._call(verb, path, network_name)
3313        except errors.UnknownSmartMethod:
3314            # Fallback - use vfs methods
3315            medium._remember_remote_is_before((1, 13))
3316            return self._vfs_initialize(a_controldir, name=name,
3317                                        append_revisions_only=append_revisions_only,
3318                                        repository=repository)
3319        if response[0] != b'ok':
3320            raise errors.UnexpectedSmartServerResponse(response)
3321        # Turn the response into a RemoteRepository object.
3322        format = RemoteBranchFormat(network_name=response[1])
3323        repo_format = response_tuple_to_repo_format(response[3:])
3324        repo_path = response[2].decode('utf-8')
3325        if repository is not None:
3326            remote_repo_url = urlutils.join(a_controldir.user_url, repo_path)
3327            url_diff = urlutils.relative_url(repository.user_url,
3328                                             remote_repo_url)
3329            if url_diff != '.':
3330                raise AssertionError(
3331                    'repository.user_url %r does not match URL from server '
3332                    'response (%r + %r)'
3333                    % (repository.user_url, a_controldir.user_url, repo_path))
3334            remote_repo = repository
3335        else:
3336            if repo_path == '':
3337                repo_bzrdir = a_controldir
3338            else:
3339                repo_bzrdir = RemoteBzrDir(
3340                    a_controldir.root_transport.clone(
3341                        repo_path), a_controldir._format,
3342                    a_controldir._client)
3343            remote_repo = RemoteRepository(repo_bzrdir, repo_format)
3344        remote_branch = RemoteBranch(a_controldir, remote_repo,
3345                                     format=format, setup_stacking=False, name=name)
3346        if append_revisions_only:
3347            remote_branch.set_append_revisions_only(append_revisions_only)
3348        # XXX: We know this is a new branch, so it must have revno 0, revid
3349        # NULL_REVISION. Creating the branch locked would make this be unable
3350        # to be wrong; here its simply very unlikely to be wrong. RBC 20090225
3351        remote_branch._last_revision_info_cache = 0, NULL_REVISION
3352        return remote_branch
3353
3354    def make_tags(self, branch):
3355        self._ensure_real()
3356        return self._custom_format.make_tags(branch)
3357
3358    def supports_tags(self):
3359        # Remote branches might support tags, but we won't know until we
3360        # access the real remote branch.
3361        self._ensure_real()
3362        return self._custom_format.supports_tags()
3363
3364    def supports_stacking(self):
3365        self._ensure_real()
3366        return self._custom_format.supports_stacking()
3367
3368    def supports_set_append_revisions_only(self):
3369        self._ensure_real()
3370        return self._custom_format.supports_set_append_revisions_only()
3371
3372    @property
3373    def supports_reference_locations(self):
3374        self._ensure_real()
3375        return self._custom_format.supports_reference_locations
3376
3377    def stores_revno(self):
3378        return True
3379
3380    def _use_default_local_heads_to_fetch(self):
3381        # If the branch format is a metadir format *and* its heads_to_fetch
3382        # implementation is not overridden vs the base class, we can use the
3383        # base class logic rather than use the heads_to_fetch RPC.  This is
3384        # usually cheaper in terms of net round trips, as the last-revision and
3385        # tags info fetched is cached and would be fetched anyway.
3386        self._ensure_real()
3387        if isinstance(self._custom_format, bzrbranch.BranchFormatMetadir):
3388            branch_class = self._custom_format._branch_class()
3389            heads_to_fetch_impl = branch_class.heads_to_fetch
3390            if heads_to_fetch_impl is branch.Branch.heads_to_fetch:
3391                return True
3392        return False
3393
3394
3395class RemoteBranchStore(_mod_config.IniFileStore):
3396    """Branch store which attempts to use HPSS calls to retrieve branch store.
3397
3398    Note that this is specific to bzr-based formats.
3399    """
3400
3401    def __init__(self, branch):
3402        super(RemoteBranchStore, self).__init__()
3403        self.branch = branch
3404        self.id = "branch"
3405        self._real_store = None
3406
3407    def external_url(self):
3408        return urlutils.join(self.branch.user_url, 'branch.conf')
3409
3410    def _load_content(self):
3411        path = self.branch._remote_path()
3412        try:
3413            response, handler = self.branch._call_expecting_body(
3414                b'Branch.get_config_file', path)
3415        except errors.UnknownSmartMethod:
3416            self._ensure_real()
3417            return self._real_store._load_content()
3418        if len(response) and response[0] != b'ok':
3419            raise errors.UnexpectedSmartServerResponse(response)
3420        return handler.read_body_bytes()
3421
3422    def _save_content(self, content):
3423        path = self.branch._remote_path()
3424        try:
3425            response, handler = self.branch._call_with_body_bytes_expecting_body(
3426                b'Branch.put_config_file', (path,
3427                                            self.branch._lock_token, self.branch._repo_lock_token),
3428                content)
3429        except errors.UnknownSmartMethod:
3430            self._ensure_real()
3431            return self._real_store._save_content(content)
3432        handler.cancel_read_body()
3433        if response != (b'ok', ):
3434            raise errors.UnexpectedSmartServerResponse(response)
3435
3436    def _ensure_real(self):
3437        self.branch._ensure_real()
3438        if self._real_store is None:
3439            self._real_store = _mod_config.BranchStore(self.branch)
3440
3441
3442class RemoteBranch(branch.Branch, _RpcHelper, lock._RelockDebugMixin):
3443    """Branch stored on a server accessed by HPSS RPC.
3444
3445    At the moment most operations are mapped down to simple file operations.
3446    """
3447
3448    def __init__(self, remote_bzrdir, remote_repository, real_branch=None,
3449                 _client=None, format=None, setup_stacking=True, name=None,
3450                 possible_transports=None):
3451        """Create a RemoteBranch instance.
3452
3453        :param real_branch: An optional local implementation of the branch
3454            format, usually accessing the data via the VFS.
3455        :param _client: Private parameter for testing.
3456        :param format: A RemoteBranchFormat object, None to create one
3457            automatically. If supplied it should have a network_name already
3458            supplied.
3459        :param setup_stacking: If True make an RPC call to determine the
3460            stacked (or not) status of the branch. If False assume the branch
3461            is not stacked.
3462        :param name: Colocated branch name
3463        """
3464        # We intentionally don't call the parent class's __init__, because it
3465        # will try to assign to self.tags, which is a property in this subclass.
3466        # And the parent's __init__ doesn't do much anyway.
3467        self.controldir = remote_bzrdir
3468        self.name = name
3469        if _client is not None:
3470            self._client = _client
3471        else:
3472            self._client = remote_bzrdir._client
3473        self.repository = remote_repository
3474        if real_branch is not None:
3475            self._real_branch = real_branch
3476            # Give the remote repository the matching real repo.
3477            real_repo = self._real_branch.repository
3478            if isinstance(real_repo, RemoteRepository):
3479                real_repo._ensure_real()
3480                real_repo = real_repo._real_repository
3481            self.repository._set_real_repository(real_repo)
3482            # Give the branch the remote repository to let fast-pathing happen.
3483            self._real_branch.repository = self.repository
3484        else:
3485            self._real_branch = None
3486        # Fill out expected attributes of branch for breezy API users.
3487        self._clear_cached_state()
3488        # TODO: deprecate self.base in favor of user_url
3489        self.base = self.controldir.user_url
3490        self._name = name
3491        self._control_files = None
3492        self._lock_mode = None
3493        self._lock_token = None
3494        self._repo_lock_token = None
3495        self._lock_count = 0
3496        self._leave_lock = False
3497        self.conf_store = None
3498        # Setup a format: note that we cannot call _ensure_real until all the
3499        # attributes above are set: This code cannot be moved higher up in this
3500        # function.
3501        if format is None:
3502            self._format = RemoteBranchFormat()
3503            if real_branch is not None:
3504                self._format._network_name = \
3505                    self._real_branch._format.network_name()
3506        else:
3507            self._format = format
3508        # when we do _ensure_real we may need to pass ignore_fallbacks to the
3509        # branch.open_branch method.
3510        self._real_ignore_fallbacks = not setup_stacking
3511        if not self._format._network_name:
3512            # Did not get from open_branchV2 - old server.
3513            self._ensure_real()
3514            self._format._network_name = \
3515                self._real_branch._format.network_name()
3516        self.tags = self._format.make_tags(self)
3517        # The base class init is not called, so we duplicate this:
3518        hooks = branch.Branch.hooks['open']
3519        for hook in hooks:
3520            hook(self)
3521        self._is_stacked = False
3522        if setup_stacking:
3523            self._setup_stacking(possible_transports)
3524
3525    def _setup_stacking(self, possible_transports):
3526        # configure stacking into the remote repository, by reading it from
3527        # the vfs branch.
3528        try:
3529            fallback_url = self.get_stacked_on_url()
3530        except (errors.NotStacked, branch.UnstackableBranchFormat,
3531                errors.UnstackableRepositoryFormat) as e:
3532            return
3533        self._is_stacked = True
3534        if possible_transports is None:
3535            possible_transports = []
3536        else:
3537            possible_transports = list(possible_transports)
3538        possible_transports.append(self.controldir.root_transport)
3539        self._activate_fallback_location(fallback_url,
3540                                         possible_transports=possible_transports)
3541
3542    def _get_config(self):
3543        return RemoteBranchConfig(self)
3544
3545    def _get_config_store(self):
3546        if self.conf_store is None:
3547            self.conf_store = RemoteBranchStore(self)
3548        return self.conf_store
3549
3550    def store_uncommitted(self, creator):
3551        self._ensure_real()
3552        return self._real_branch.store_uncommitted(creator)
3553
3554    def get_unshelver(self, tree):
3555        self._ensure_real()
3556        return self._real_branch.get_unshelver(tree)
3557
3558    def _get_real_transport(self):
3559        # if we try vfs access, return the real branch's vfs transport
3560        self._ensure_real()
3561        return self._real_branch._transport
3562
3563    _transport = property(_get_real_transport)
3564
3565    def __str__(self):
3566        return "%s(%s)" % (self.__class__.__name__, self.base)
3567
3568    __repr__ = __str__
3569
3570    def _ensure_real(self):
3571        """Ensure that there is a _real_branch set.
3572
3573        Used before calls to self._real_branch.
3574        """
3575        if self._real_branch is None:
3576            if not vfs.vfs_enabled():
3577                raise AssertionError('smart server vfs must be enabled '
3578                                     'to use vfs implementation')
3579            self.controldir._ensure_real()
3580            self._real_branch = self.controldir._real_bzrdir.open_branch(
3581                ignore_fallbacks=self._real_ignore_fallbacks, name=self._name)
3582            # The remote branch and the real branch shares the same store. If
3583            # we don't, there will always be cases where one of the stores
3584            # doesn't see an update made on the other.
3585            self._real_branch.conf_store = self.conf_store
3586            if self.repository._real_repository is None:
3587                # Give the remote repository the matching real repo.
3588                real_repo = self._real_branch.repository
3589                if isinstance(real_repo, RemoteRepository):
3590                    real_repo._ensure_real()
3591                    real_repo = real_repo._real_repository
3592                self.repository._set_real_repository(real_repo)
3593            # Give the real branch the remote repository to let fast-pathing
3594            # happen.
3595            self._real_branch.repository = self.repository
3596            if self._lock_mode == 'r':
3597                self._real_branch.lock_read()
3598            elif self._lock_mode == 'w':
3599                self._real_branch.lock_write(token=self._lock_token)
3600
3601    def _translate_error(self, err, **context):
3602        self.repository._translate_error(err, branch=self, **context)
3603
3604    def _clear_cached_state(self):
3605        super(RemoteBranch, self)._clear_cached_state()
3606        self._tags_bytes = None
3607        if self._real_branch is not None:
3608            self._real_branch._clear_cached_state()
3609
3610    def _clear_cached_state_of_remote_branch_only(self):
3611        """Like _clear_cached_state, but doesn't clear the cache of
3612        self._real_branch.
3613
3614        This is useful when falling back to calling a method of
3615        self._real_branch that changes state.  In that case the underlying
3616        branch changes, so we need to invalidate this RemoteBranch's cache of
3617        it.  However, there's no need to invalidate the _real_branch's cache
3618        too, in fact doing so might harm performance.
3619        """
3620        super(RemoteBranch, self)._clear_cached_state()
3621
3622    @property
3623    def control_files(self):
3624        # Defer actually creating RemoteBranchLockableFiles until its needed,
3625        # because it triggers an _ensure_real that we otherwise might not need.
3626        if self._control_files is None:
3627            self._control_files = RemoteBranchLockableFiles(
3628                self.controldir, self._client)
3629        return self._control_files
3630
3631    def get_physical_lock_status(self):
3632        """See Branch.get_physical_lock_status()."""
3633        try:
3634            response = self._client.call(b'Branch.get_physical_lock_status',
3635                                         self._remote_path())
3636        except errors.UnknownSmartMethod:
3637            self._ensure_real()
3638            return self._real_branch.get_physical_lock_status()
3639        if response[0] not in (b'yes', b'no'):
3640            raise errors.UnexpectedSmartServerResponse(response)
3641        return (response[0] == b'yes')
3642
3643    def get_stacked_on_url(self):
3644        """Get the URL this branch is stacked against.
3645
3646        :raises NotStacked: If the branch is not stacked.
3647        :raises UnstackableBranchFormat: If the branch does not support
3648            stacking.
3649        :raises UnstackableRepositoryFormat: If the repository does not support
3650            stacking.
3651        """
3652        try:
3653            # there may not be a repository yet, so we can't use
3654            # self._translate_error, so we can't use self._call either.
3655            response = self._client.call(b'Branch.get_stacked_on_url',
3656                                         self._remote_path())
3657        except errors.ErrorFromSmartServer as err:
3658            # there may not be a repository yet, so we can't call through
3659            # its _translate_error
3660            _translate_error(err, branch=self)
3661        except errors.UnknownSmartMethod as err:
3662            self._ensure_real()
3663            return self._real_branch.get_stacked_on_url()
3664        if response[0] != b'ok':
3665            raise errors.UnexpectedSmartServerResponse(response)
3666        return response[1].decode('utf-8')
3667
3668    def set_stacked_on_url(self, url):
3669        branch.Branch.set_stacked_on_url(self, url)
3670        # We need the stacked_on_url to be visible both locally (to not query
3671        # it repeatedly) and remotely (so smart verbs can get it server side)
3672        # Without the following line,
3673        # breezy.tests.per_branch.test_create_clone.TestCreateClone
3674        # .test_create_clone_on_transport_stacked_hooks_get_stacked_branch
3675        # fails for remote branches -- vila 2012-01-04
3676        self.conf_store.save_changes()
3677        if not url:
3678            self._is_stacked = False
3679        else:
3680            self._is_stacked = True
3681
3682    def _vfs_get_tags_bytes(self):
3683        self._ensure_real()
3684        return self._real_branch._get_tags_bytes()
3685
3686    def _get_tags_bytes(self):
3687        with self.lock_read():
3688            if self._tags_bytes is None:
3689                self._tags_bytes = self._get_tags_bytes_via_hpss()
3690            return self._tags_bytes
3691
3692    def _get_tags_bytes_via_hpss(self):
3693        medium = self._client._medium
3694        if medium._is_remote_before((1, 13)):
3695            return self._vfs_get_tags_bytes()
3696        try:
3697            response = self._call(
3698                b'Branch.get_tags_bytes', self._remote_path())
3699        except errors.UnknownSmartMethod:
3700            medium._remember_remote_is_before((1, 13))
3701            return self._vfs_get_tags_bytes()
3702        return response[0]
3703
3704    def _vfs_set_tags_bytes(self, bytes):
3705        self._ensure_real()
3706        return self._real_branch._set_tags_bytes(bytes)
3707
3708    def _set_tags_bytes(self, bytes):
3709        if self.is_locked():
3710            self._tags_bytes = bytes
3711        medium = self._client._medium
3712        if medium._is_remote_before((1, 18)):
3713            self._vfs_set_tags_bytes(bytes)
3714            return
3715        try:
3716            args = (
3717                self._remote_path(), self._lock_token, self._repo_lock_token)
3718            response = self._call_with_body_bytes(
3719                b'Branch.set_tags_bytes', args, bytes)
3720        except errors.UnknownSmartMethod:
3721            medium._remember_remote_is_before((1, 18))
3722            self._vfs_set_tags_bytes(bytes)
3723
3724    def lock_read(self):
3725        """Lock the branch for read operations.
3726
3727        :return: A breezy.lock.LogicalLockResult.
3728        """
3729        self.repository.lock_read()
3730        if not self._lock_mode:
3731            self._note_lock('r')
3732            self._lock_mode = 'r'
3733            self._lock_count = 1
3734            if self._real_branch is not None:
3735                self._real_branch.lock_read()
3736        else:
3737            self._lock_count += 1
3738        return lock.LogicalLockResult(self.unlock)
3739
3740    def _remote_lock_write(self, token):
3741        if token is None:
3742            branch_token = repo_token = b''
3743        else:
3744            branch_token = token
3745            repo_token = self.repository.lock_write().repository_token
3746            self.repository.unlock()
3747        err_context = {'token': token}
3748        try:
3749            response = self._call(
3750                b'Branch.lock_write', self._remote_path(), branch_token,
3751                repo_token or b'', **err_context)
3752        except errors.LockContention as e:
3753            # The LockContention from the server doesn't have any
3754            # information about the lock_url. We re-raise LockContention
3755            # with valid lock_url.
3756            raise errors.LockContention('(remote lock)',
3757                                        self.repository.base.split('.bzr/')[0])
3758        if response[0] != b'ok':
3759            raise errors.UnexpectedSmartServerResponse(response)
3760        ok, branch_token, repo_token = response
3761        return branch_token, repo_token
3762
3763    def lock_write(self, token=None):
3764        if not self._lock_mode:
3765            self._note_lock('w')
3766            # Lock the branch and repo in one remote call.
3767            remote_tokens = self._remote_lock_write(token)
3768            self._lock_token, self._repo_lock_token = remote_tokens
3769            if not self._lock_token:
3770                raise SmartProtocolError(
3771                    'Remote server did not return a token!')
3772            # Tell the self.repository object that it is locked.
3773            self.repository.lock_write(
3774                self._repo_lock_token, _skip_rpc=True)
3775
3776            if self._real_branch is not None:
3777                self._real_branch.lock_write(token=self._lock_token)
3778            if token is not None:
3779                self._leave_lock = True
3780            else:
3781                self._leave_lock = False
3782            self._lock_mode = 'w'
3783            self._lock_count = 1
3784        elif self._lock_mode == 'r':
3785            raise errors.ReadOnlyError(self)
3786        else:
3787            if token is not None:
3788                # A token was given to lock_write, and we're relocking, so
3789                # check that the given token actually matches the one we
3790                # already have.
3791                if token != self._lock_token:
3792                    raise errors.TokenMismatch(token, self._lock_token)
3793            self._lock_count += 1
3794            # Re-lock the repository too.
3795            self.repository.lock_write(self._repo_lock_token)
3796        return BranchWriteLockResult(self.unlock, self._lock_token or None)
3797
3798    def _unlock(self, branch_token, repo_token):
3799        err_context = {'token': str((branch_token, repo_token))}
3800        response = self._call(
3801            b'Branch.unlock', self._remote_path(), branch_token,
3802            repo_token or b'', **err_context)
3803        if response == (b'ok',):
3804            return
3805        raise errors.UnexpectedSmartServerResponse(response)
3806
3807    @only_raises(errors.LockNotHeld, errors.LockBroken)
3808    def unlock(self):
3809        try:
3810            self._lock_count -= 1
3811            if not self._lock_count:
3812                if self.conf_store is not None:
3813                    self.conf_store.save_changes()
3814                self._clear_cached_state()
3815                mode = self._lock_mode
3816                self._lock_mode = None
3817                if self._real_branch is not None:
3818                    if (not self._leave_lock and mode == 'w'
3819                            and self._repo_lock_token):
3820                        # If this RemoteBranch will remove the physical lock
3821                        # for the repository, make sure the _real_branch
3822                        # doesn't do it first.  (Because the _real_branch's
3823                        # repository is set to be the RemoteRepository.)
3824                        self._real_branch.repository.leave_lock_in_place()
3825                    self._real_branch.unlock()
3826                if mode != 'w':
3827                    # Only write-locked branched need to make a remote method
3828                    # call to perform the unlock.
3829                    return
3830                if not self._lock_token:
3831                    raise AssertionError('Locked, but no token!')
3832                branch_token = self._lock_token
3833                repo_token = self._repo_lock_token
3834                self._lock_token = None
3835                self._repo_lock_token = None
3836                if not self._leave_lock:
3837                    self._unlock(branch_token, repo_token)
3838        finally:
3839            self.repository.unlock()
3840
3841    def break_lock(self):
3842        try:
3843            response = self._call(
3844                b'Branch.break_lock', self._remote_path())
3845        except errors.UnknownSmartMethod:
3846            self._ensure_real()
3847            return self._real_branch.break_lock()
3848        if response != (b'ok',):
3849            raise errors.UnexpectedSmartServerResponse(response)
3850
3851    def leave_lock_in_place(self):
3852        if not self._lock_token:
3853            raise NotImplementedError(self.leave_lock_in_place)
3854        self._leave_lock = True
3855
3856    def dont_leave_lock_in_place(self):
3857        if not self._lock_token:
3858            raise NotImplementedError(self.dont_leave_lock_in_place)
3859        self._leave_lock = False
3860
3861    def get_rev_id(self, revno, history=None):
3862        if revno == 0:
3863            return _mod_revision.NULL_REVISION
3864        with self.lock_read():
3865            last_revision_info = self.last_revision_info()
3866            if revno < 0:
3867                raise errors.RevnoOutOfBounds(
3868                    revno, (0, last_revision_info[0]))
3869            ok, result = self.repository.get_rev_id_for_revno(
3870                revno, last_revision_info)
3871            if ok:
3872                return result
3873            missing_parent = result[1]
3874            # Either the revision named by the server is missing, or its parent
3875            # is.  Call get_parent_map to determine which, so that we report a
3876            # useful error.
3877            parent_map = self.repository.get_parent_map([missing_parent])
3878            if missing_parent in parent_map:
3879                missing_parent = parent_map[missing_parent]
3880            raise errors.NoSuchRevision(self, missing_parent)
3881
3882    def _read_last_revision_info(self):
3883        response = self._call(
3884            b'Branch.last_revision_info', self._remote_path())
3885        if response[0] != b'ok':
3886            raise SmartProtocolError(
3887                'unexpected response code %s' % (response,))
3888        revno = int(response[1])
3889        last_revision = response[2]
3890        return (revno, last_revision)
3891
3892    def _gen_revision_history(self):
3893        """See Branch._gen_revision_history()."""
3894        if self._is_stacked:
3895            self._ensure_real()
3896            return self._real_branch._gen_revision_history()
3897        response_tuple, response_handler = self._call_expecting_body(
3898            b'Branch.revision_history', self._remote_path())
3899        if response_tuple[0] != b'ok':
3900            raise errors.UnexpectedSmartServerResponse(response_tuple)
3901        result = response_handler.read_body_bytes().split(b'\x00')
3902        if result == ['']:
3903            return []
3904        return result
3905
3906    def _remote_path(self):
3907        return self.controldir._path_for_remote_call(self._client)
3908
3909    def _set_last_revision_descendant(self, revision_id, other_branch,
3910                                      allow_diverged=False, allow_overwrite_descendant=False):
3911        # This performs additional work to meet the hook contract; while its
3912        # undesirable, we have to synthesise the revno to call the hook, and
3913        # not calling the hook is worse as it means changes can't be prevented.
3914        # Having calculated this though, we can't just call into
3915        # set_last_revision_info as a simple call, because there is a set_rh
3916        # hook that some folk may still be using.
3917        old_revno, old_revid = self.last_revision_info()
3918        history = self._lefthand_history(revision_id)
3919        self._run_pre_change_branch_tip_hooks(len(history), revision_id)
3920        err_context = {'other_branch': other_branch}
3921        response = self._call(b'Branch.set_last_revision_ex',
3922                              self._remote_path(), self._lock_token, self._repo_lock_token,
3923                              revision_id, int(allow_diverged), int(
3924                                  allow_overwrite_descendant),
3925                              **err_context)
3926        self._clear_cached_state()
3927        if len(response) != 3 and response[0] != b'ok':
3928            raise errors.UnexpectedSmartServerResponse(response)
3929        new_revno, new_revision_id = response[1:]
3930        self._last_revision_info_cache = new_revno, new_revision_id
3931        self._run_post_change_branch_tip_hooks(old_revno, old_revid)
3932        if self._real_branch is not None:
3933            cache = new_revno, new_revision_id
3934            self._real_branch._last_revision_info_cache = cache
3935
3936    def _set_last_revision(self, revision_id):
3937        old_revno, old_revid = self.last_revision_info()
3938        # This performs additional work to meet the hook contract; while its
3939        # undesirable, we have to synthesise the revno to call the hook, and
3940        # not calling the hook is worse as it means changes can't be prevented.
3941        # Having calculated this though, we can't just call into
3942        # set_last_revision_info as a simple call, because there is a set_rh
3943        # hook that some folk may still be using.
3944        history = self._lefthand_history(revision_id)
3945        self._run_pre_change_branch_tip_hooks(len(history), revision_id)
3946        self._clear_cached_state()
3947        response = self._call(b'Branch.set_last_revision',
3948                              self._remote_path(), self._lock_token, self._repo_lock_token,
3949                              revision_id)
3950        if response != (b'ok',):
3951            raise errors.UnexpectedSmartServerResponse(response)
3952        self._run_post_change_branch_tip_hooks(old_revno, old_revid)
3953
3954    def _get_parent_location(self):
3955        medium = self._client._medium
3956        if medium._is_remote_before((1, 13)):
3957            return self._vfs_get_parent_location()
3958        try:
3959            response = self._call(b'Branch.get_parent', self._remote_path())
3960        except errors.UnknownSmartMethod:
3961            medium._remember_remote_is_before((1, 13))
3962            return self._vfs_get_parent_location()
3963        if len(response) != 1:
3964            raise errors.UnexpectedSmartServerResponse(response)
3965        parent_location = response[0]
3966        if parent_location == b'':
3967            return None
3968        return parent_location.decode('utf-8')
3969
3970    def _vfs_get_parent_location(self):
3971        self._ensure_real()
3972        return self._real_branch._get_parent_location()
3973
3974    def _set_parent_location(self, url):
3975        medium = self._client._medium
3976        if medium._is_remote_before((1, 15)):
3977            return self._vfs_set_parent_location(url)
3978        try:
3979            call_url = url or u''
3980            if isinstance(call_url, str):
3981                call_url = call_url.encode('utf-8')
3982            response = self._call(b'Branch.set_parent_location',
3983                                  self._remote_path(), self._lock_token, self._repo_lock_token,
3984                                  call_url)
3985        except errors.UnknownSmartMethod:
3986            medium._remember_remote_is_before((1, 15))
3987            return self._vfs_set_parent_location(url)
3988        if response != ():
3989            raise errors.UnexpectedSmartServerResponse(response)
3990
3991    def _vfs_set_parent_location(self, url):
3992        self._ensure_real()
3993        return self._real_branch._set_parent_location(url)
3994
3995    def pull(self, source, overwrite=False, stop_revision=None,
3996             **kwargs):
3997        with self.lock_write():
3998            self._clear_cached_state_of_remote_branch_only()
3999            self._ensure_real()
4000            return self._real_branch.pull(
4001                source, overwrite=overwrite, stop_revision=stop_revision,
4002                _override_hook_target=self, **kwargs)
4003
4004    def push(self, target, overwrite=False, stop_revision=None, lossy=False, tag_selector=None):
4005        with self.lock_read():
4006            self._ensure_real()
4007            return self._real_branch.push(
4008                target, overwrite=overwrite, stop_revision=stop_revision, lossy=lossy,
4009                _override_hook_source_branch=self, tag_selector=tag_selector)
4010
4011    def peek_lock_mode(self):
4012        return self._lock_mode
4013
4014    def is_locked(self):
4015        return self._lock_count >= 1
4016
4017    def revision_id_to_dotted_revno(self, revision_id):
4018        """Given a revision id, return its dotted revno.
4019
4020        :return: a tuple like (1,) or (400,1,3).
4021        """
4022        with self.lock_read():
4023            try:
4024                response = self._call(b'Branch.revision_id_to_revno',
4025                                      self._remote_path(), revision_id)
4026            except errors.UnknownSmartMethod:
4027                self._ensure_real()
4028                return self._real_branch.revision_id_to_dotted_revno(revision_id)
4029            except errors.UnknownErrorFromSmartServer as e:
4030                # Deal with older versions of bzr/brz that didn't explicitly
4031                # wrap GhostRevisionsHaveNoRevno.
4032                if e.error_tuple[1] == b'GhostRevisionsHaveNoRevno':
4033                    (revid, ghost_revid) = re.findall(b"{([^}]+)}", e.error_tuple[2])
4034                    raise errors.GhostRevisionsHaveNoRevno(
4035                        revid, ghost_revid)
4036                raise
4037            if response[0] == b'ok':
4038                return tuple([int(x) for x in response[1:]])
4039            else:
4040                raise errors.UnexpectedSmartServerResponse(response)
4041
4042    def revision_id_to_revno(self, revision_id):
4043        """Given a revision id on the branch mainline, return its revno.
4044
4045        :return: an integer
4046        """
4047        with self.lock_read():
4048            try:
4049                response = self._call(b'Branch.revision_id_to_revno',
4050                                      self._remote_path(), revision_id)
4051            except errors.UnknownSmartMethod:
4052                self._ensure_real()
4053                return self._real_branch.revision_id_to_revno(revision_id)
4054            if response[0] == b'ok':
4055                if len(response) == 2:
4056                    return int(response[1])
4057                raise NoSuchRevision(self, revision_id)
4058            else:
4059                raise errors.UnexpectedSmartServerResponse(response)
4060
4061    def set_last_revision_info(self, revno, revision_id):
4062        with self.lock_write():
4063            # XXX: These should be returned by the set_last_revision_info verb
4064            old_revno, old_revid = self.last_revision_info()
4065            self._run_pre_change_branch_tip_hooks(revno, revision_id)
4066            if not revision_id or not isinstance(revision_id, bytes):
4067                raise errors.InvalidRevisionId(
4068                    revision_id=revision_id, branch=self)
4069            try:
4070                response = self._call(b'Branch.set_last_revision_info',
4071                                      self._remote_path(), self._lock_token, self._repo_lock_token,
4072                                      str(revno).encode('ascii'), revision_id)
4073            except errors.UnknownSmartMethod:
4074                self._ensure_real()
4075                self._clear_cached_state_of_remote_branch_only()
4076                self._real_branch.set_last_revision_info(revno, revision_id)
4077                self._last_revision_info_cache = revno, revision_id
4078                return
4079            if response == (b'ok',):
4080                self._clear_cached_state()
4081                self._last_revision_info_cache = revno, revision_id
4082                self._run_post_change_branch_tip_hooks(old_revno, old_revid)
4083                # Update the _real_branch's cache too.
4084                if self._real_branch is not None:
4085                    cache = self._last_revision_info_cache
4086                    self._real_branch._last_revision_info_cache = cache
4087            else:
4088                raise errors.UnexpectedSmartServerResponse(response)
4089
4090    def generate_revision_history(self, revision_id, last_rev=None,
4091                                  other_branch=None):
4092        with self.lock_write():
4093            medium = self._client._medium
4094            if not medium._is_remote_before((1, 6)):
4095                # Use a smart method for 1.6 and above servers
4096                try:
4097                    self._set_last_revision_descendant(revision_id, other_branch,
4098                                                       allow_diverged=True, allow_overwrite_descendant=True)
4099                    return
4100                except errors.UnknownSmartMethod:
4101                    medium._remember_remote_is_before((1, 6))
4102            self._clear_cached_state_of_remote_branch_only()
4103            graph = self.repository.get_graph()
4104            (last_revno, last_revid) = self.last_revision_info()
4105            known_revision_ids = [
4106                (last_revid, last_revno),
4107                (_mod_revision.NULL_REVISION, 0),
4108                ]
4109            if last_rev is not None:
4110                if not graph.is_ancestor(last_rev, revision_id):
4111                    # our previous tip is not merged into stop_revision
4112                    raise errors.DivergedBranches(self, other_branch)
4113            revno = graph.find_distance_to_null(
4114                revision_id, known_revision_ids)
4115            self.set_last_revision_info(revno, revision_id)
4116
4117    def set_push_location(self, location):
4118        self._set_config_location('push_location', location)
4119
4120    def heads_to_fetch(self):
4121        if self._format._use_default_local_heads_to_fetch():
4122            # We recognise this format, and its heads-to-fetch implementation
4123            # is the default one (tip + tags).  In this case it's cheaper to
4124            # just use the default implementation rather than a special RPC as
4125            # the tip and tags data is cached.
4126            return branch.Branch.heads_to_fetch(self)
4127        medium = self._client._medium
4128        if medium._is_remote_before((2, 4)):
4129            return self._vfs_heads_to_fetch()
4130        try:
4131            return self._rpc_heads_to_fetch()
4132        except errors.UnknownSmartMethod:
4133            medium._remember_remote_is_before((2, 4))
4134            return self._vfs_heads_to_fetch()
4135
4136    def _rpc_heads_to_fetch(self):
4137        response = self._call(b'Branch.heads_to_fetch', self._remote_path())
4138        if len(response) != 2:
4139            raise errors.UnexpectedSmartServerResponse(response)
4140        must_fetch, if_present_fetch = response
4141        return set(must_fetch), set(if_present_fetch)
4142
4143    def _vfs_heads_to_fetch(self):
4144        self._ensure_real()
4145        return self._real_branch.heads_to_fetch()
4146
4147    def reconcile(self, thorough=True):
4148        """Make sure the data stored in this branch is consistent."""
4149        from .reconcile import BranchReconciler
4150        with self.lock_write():
4151            reconciler = BranchReconciler(self, thorough=thorough)
4152            return reconciler.reconcile()
4153
4154    def get_reference_info(self, file_id):
4155        """Get the tree_path and branch_location for a tree reference."""
4156        if not self._format.supports_reference_locations:
4157            raise errors.UnsupportedOperation(self.get_reference_info, self)
4158        return self._get_all_reference_info().get(file_id, (None, None))
4159
4160    def set_reference_info(self, file_id, branch_location, tree_path=None):
4161        """Set the branch location to use for a tree reference."""
4162        if not self._format.supports_reference_locations:
4163            raise errors.UnsupportedOperation(self.set_reference_info, self)
4164        self._ensure_real()
4165        self._real_branch.set_reference_info(
4166            file_id, branch_location, tree_path)
4167
4168    def _set_all_reference_info(self, reference_info):
4169        if not self._format.supports_reference_locations:
4170            raise errors.UnsupportedOperation(self.set_reference_info, self)
4171        self._ensure_real()
4172        self._real_branch._set_all_reference_info(reference_info)
4173
4174    def _get_all_reference_info(self):
4175        if not self._format.supports_reference_locations:
4176            return {}
4177        try:
4178            response, handler = self._call_expecting_body(
4179                b'Branch.get_all_reference_info', self._remote_path())
4180        except errors.UnknownSmartMethod:
4181            self._ensure_real()
4182            return self._real_branch._get_all_reference_info()
4183        if len(response) and response[0] != b'ok':
4184            raise errors.UnexpectedSmartServerResponse(response)
4185        ret = {}
4186        for (f, u, p) in bencode.bdecode(handler.read_body_bytes()):
4187            ret[f] = (u.decode('utf-8'), p.decode('utf-8') if p else None)
4188        return ret
4189
4190    def reference_parent(self, file_id, path, possible_transports=None):
4191        """Return the parent branch for a tree-reference.
4192
4193        :param path: The path of the nested tree in the tree
4194        :return: A branch associated with the nested tree
4195        """
4196        branch_location = self.get_reference_info(file_id)[0]
4197        if branch_location is None:
4198            try:
4199                return branch.Branch.open_from_transport(
4200                    self.controldir.root_transport.clone(path),
4201                    possible_transports=possible_transports)
4202            except errors.NotBranchError:
4203                return None
4204        return branch.Branch.open(
4205            urlutils.join(
4206                urlutils.strip_segment_parameters(self.user_url), branch_location),
4207            possible_transports=possible_transports)
4208
4209
4210class RemoteConfig(object):
4211    """A Config that reads and writes from smart verbs.
4212
4213    It is a low-level object that considers config data to be name/value pairs
4214    that may be associated with a section. Assigning meaning to the these
4215    values is done at higher levels like breezy.config.TreeConfig.
4216    """
4217
4218    def get_option(self, name, section=None, default=None):
4219        """Return the value associated with a named option.
4220
4221        :param name: The name of the value
4222        :param section: The section the option is in (if any)
4223        :param default: The value to return if the value is not set
4224        :return: The value or default value
4225        """
4226        try:
4227            configobj = self._get_configobj()
4228            section_obj = None
4229            if section is None:
4230                section_obj = configobj
4231            else:
4232                try:
4233                    section_obj = configobj[section]
4234                except KeyError:
4235                    pass
4236            if section_obj is None:
4237                value = default
4238            else:
4239                value = section_obj.get(name, default)
4240        except errors.UnknownSmartMethod:
4241            value = self._vfs_get_option(name, section, default)
4242        for hook in _mod_config.OldConfigHooks['get']:
4243            hook(self, name, value)
4244        return value
4245
4246    def _response_to_configobj(self, response):
4247        if len(response[0]) and response[0][0] != b'ok':
4248            raise errors.UnexpectedSmartServerResponse(response)
4249        lines = response[1].read_body_bytes().splitlines()
4250        conf = _mod_config.ConfigObj(lines, encoding='utf-8')
4251        for hook in _mod_config.OldConfigHooks['load']:
4252            hook(self)
4253        return conf
4254
4255
4256class RemoteBranchConfig(RemoteConfig):
4257    """A RemoteConfig for Branches."""
4258
4259    def __init__(self, branch):
4260        self._branch = branch
4261
4262    def _get_configobj(self):
4263        path = self._branch._remote_path()
4264        response = self._branch._client.call_expecting_body(
4265            b'Branch.get_config_file', path)
4266        return self._response_to_configobj(response)
4267
4268    def set_option(self, value, name, section=None):
4269        """Set the value associated with a named option.
4270
4271        :param value: The value to set
4272        :param name: The name of the value to set
4273        :param section: The section the option is in (if any)
4274        """
4275        medium = self._branch._client._medium
4276        if medium._is_remote_before((1, 14)):
4277            return self._vfs_set_option(value, name, section)
4278        if isinstance(value, dict):
4279            if medium._is_remote_before((2, 2)):
4280                return self._vfs_set_option(value, name, section)
4281            return self._set_config_option_dict(value, name, section)
4282        else:
4283            return self._set_config_option(value, name, section)
4284
4285    def _set_config_option(self, value, name, section):
4286        if isinstance(value, (bool, int)):
4287            value = str(value)
4288        elif isinstance(value, str):
4289            pass
4290        else:
4291            raise TypeError(value)
4292        try:
4293            path = self._branch._remote_path()
4294            response = self._branch._client.call(b'Branch.set_config_option',
4295                                                 path, self._branch._lock_token, self._branch._repo_lock_token,
4296                                                 value.encode('utf-8'), name.encode('utf-8'),
4297                                                 (section or '').encode('utf-8'))
4298        except errors.UnknownSmartMethod:
4299            medium = self._branch._client._medium
4300            medium._remember_remote_is_before((1, 14))
4301            return self._vfs_set_option(value, name, section)
4302        if response != ():
4303            raise errors.UnexpectedSmartServerResponse(response)
4304
4305    def _serialize_option_dict(self, option_dict):
4306        utf8_dict = {}
4307        for key, value in option_dict.items():
4308            if isinstance(key, str):
4309                key = key.encode('utf8')
4310            if isinstance(value, str):
4311                value = value.encode('utf8')
4312            utf8_dict[key] = value
4313        return bencode.bencode(utf8_dict)
4314
4315    def _set_config_option_dict(self, value, name, section):
4316        try:
4317            path = self._branch._remote_path()
4318            serialised_dict = self._serialize_option_dict(value)
4319            response = self._branch._client.call(
4320                b'Branch.set_config_option_dict',
4321                path, self._branch._lock_token, self._branch._repo_lock_token,
4322                serialised_dict, name.encode('utf-8'), (section or '').encode('utf-8'))
4323        except errors.UnknownSmartMethod:
4324            medium = self._branch._client._medium
4325            medium._remember_remote_is_before((2, 2))
4326            return self._vfs_set_option(value, name, section)
4327        if response != ():
4328            raise errors.UnexpectedSmartServerResponse(response)
4329
4330    def _real_object(self):
4331        self._branch._ensure_real()
4332        return self._branch._real_branch
4333
4334    def _vfs_set_option(self, value, name, section=None):
4335        return self._real_object()._get_config().set_option(
4336            value, name, section)
4337
4338
4339class RemoteBzrDirConfig(RemoteConfig):
4340    """A RemoteConfig for BzrDirs."""
4341
4342    def __init__(self, bzrdir):
4343        self._bzrdir = bzrdir
4344
4345    def _get_configobj(self):
4346        medium = self._bzrdir._client._medium
4347        verb = b'BzrDir.get_config_file'
4348        if medium._is_remote_before((1, 15)):
4349            raise errors.UnknownSmartMethod(verb)
4350        path = self._bzrdir._path_for_remote_call(self._bzrdir._client)
4351        response = self._bzrdir._call_expecting_body(
4352            verb, path)
4353        return self._response_to_configobj(response)
4354
4355    def _vfs_get_option(self, name, section, default):
4356        return self._real_object()._get_config().get_option(
4357            name, section, default)
4358
4359    def set_option(self, value, name, section=None):
4360        """Set the value associated with a named option.
4361
4362        :param value: The value to set
4363        :param name: The name of the value to set
4364        :param section: The section the option is in (if any)
4365        """
4366        return self._real_object()._get_config().set_option(
4367            value, name, section)
4368
4369    def _real_object(self):
4370        self._bzrdir._ensure_real()
4371        return self._bzrdir._real_bzrdir
4372
4373
4374error_translators = registry.Registry()
4375no_context_error_translators = registry.Registry()
4376
4377
4378def _translate_error(err, **context):
4379    """Translate an ErrorFromSmartServer into a more useful error.
4380
4381    Possible context keys:
4382      - branch
4383      - repository
4384      - bzrdir
4385      - token
4386      - other_branch
4387      - path
4388
4389    If the error from the server doesn't match a known pattern, then
4390    UnknownErrorFromSmartServer is raised.
4391    """
4392    def find(name):
4393        try:
4394            return context[name]
4395        except KeyError:
4396            mutter('Missing key \'%s\' in context %r', name, context)
4397            raise err
4398
4399    def get_path():
4400        """Get the path from the context if present, otherwise use first error
4401        arg.
4402        """
4403        try:
4404            return context['path']
4405        except KeyError:
4406            try:
4407                return err.error_args[0].decode('utf-8')
4408            except IndexError:
4409                mutter('Missing key \'path\' in context %r', context)
4410                raise err
4411    if not isinstance(err.error_verb, bytes):
4412        raise TypeError(err.error_verb)
4413    try:
4414        translator = error_translators.get(err.error_verb)
4415    except KeyError:
4416        pass
4417    else:
4418        raise translator(err, find, get_path)
4419    try:
4420        translator = no_context_error_translators.get(err.error_verb)
4421    except KeyError:
4422        raise errors.UnknownErrorFromSmartServer(err)
4423    else:
4424        raise translator(err)
4425
4426
4427error_translators.register(b'NoSuchRevision',
4428                           lambda err, find, get_path: NoSuchRevision(
4429                               find('branch'), err.error_args[0]))
4430error_translators.register(b'nosuchrevision',
4431                           lambda err, find, get_path: NoSuchRevision(
4432                               find('repository'), err.error_args[0]))
4433error_translators.register(
4434    b'revno-outofbounds',
4435    lambda err, find, get_path: errors.RevnoOutOfBounds(
4436        err.error_args[0], (err.error_args[1], err.error_args[2])))
4437
4438
4439def _translate_nobranch_error(err, find, get_path):
4440    if len(err.error_args) >= 1:
4441        extra = err.error_args[0].decode('utf-8')
4442    else:
4443        extra = None
4444    return errors.NotBranchError(path=find('bzrdir').root_transport.base,
4445                                 detail=extra)
4446
4447
4448error_translators.register(b'nobranch', _translate_nobranch_error)
4449error_translators.register(b'norepository',
4450                           lambda err, find, get_path: errors.NoRepositoryPresent(
4451                               find('bzrdir')))
4452error_translators.register(b'UnlockableTransport',
4453                           lambda err, find, get_path: errors.UnlockableTransport(
4454                               find('bzrdir').root_transport))
4455error_translators.register(b'TokenMismatch',
4456                           lambda err, find, get_path: errors.TokenMismatch(
4457                               find('token'), '(remote token)'))
4458error_translators.register(b'Diverged',
4459                           lambda err, find, get_path: errors.DivergedBranches(
4460                               find('branch'), find('other_branch')))
4461error_translators.register(b'NotStacked',
4462                           lambda err, find, get_path: errors.NotStacked(branch=find('branch')))
4463
4464
4465def _translate_PermissionDenied(err, find, get_path):
4466    path = get_path()
4467    if len(err.error_args) >= 2:
4468        extra = err.error_args[1].decode('utf-8')
4469    else:
4470        extra = None
4471    return errors.PermissionDenied(path, extra=extra)
4472
4473
4474error_translators.register(b'PermissionDenied', _translate_PermissionDenied)
4475error_translators.register(b'ReadError',
4476                           lambda err, find, get_path: errors.ReadError(get_path()))
4477error_translators.register(b'NoSuchFile',
4478                           lambda err, find, get_path: errors.NoSuchFile(get_path()))
4479error_translators.register(b'TokenLockingNotSupported',
4480                           lambda err, find, get_path: errors.TokenLockingNotSupported(
4481                               find('repository')))
4482error_translators.register(b'UnsuspendableWriteGroup',
4483                           lambda err, find, get_path: errors.UnsuspendableWriteGroup(
4484                               repository=find('repository')))
4485error_translators.register(b'UnresumableWriteGroup',
4486                           lambda err, find, get_path: errors.UnresumableWriteGroup(
4487                               repository=find('repository'), write_groups=err.error_args[0],
4488                               reason=err.error_args[1]))
4489error_translators.register(b'AlreadyControlDir',
4490                           lambda err, find, get_path: errors.AlreadyControlDirError(get_path()))
4491
4492no_context_error_translators.register(b'GhostRevisionsHaveNoRevno',
4493                                      lambda err: errors.GhostRevisionsHaveNoRevno(*err.error_args))
4494no_context_error_translators.register(b'IncompatibleRepositories',
4495                                      lambda err: errors.IncompatibleRepositories(
4496                                          err.error_args[0].decode('utf-8'), err.error_args[1].decode('utf-8'), err.error_args[2].decode('utf-8')))
4497no_context_error_translators.register(b'LockContention',
4498                                      lambda err: errors.LockContention('(remote lock)'))
4499no_context_error_translators.register(b'LockFailed',
4500                                      lambda err: errors.LockFailed(err.error_args[0].decode('utf-8'), err.error_args[1].decode('utf-8')))
4501no_context_error_translators.register(b'TipChangeRejected',
4502                                      lambda err: errors.TipChangeRejected(err.error_args[0].decode('utf8')))
4503no_context_error_translators.register(b'UnstackableBranchFormat',
4504                                      lambda err: branch.UnstackableBranchFormat(*err.error_args))
4505no_context_error_translators.register(b'UnstackableRepositoryFormat',
4506                                      lambda err: errors.UnstackableRepositoryFormat(*err.error_args))
4507no_context_error_translators.register(b'FileExists',
4508                                      lambda err: errors.FileExists(err.error_args[0].decode('utf-8')))
4509no_context_error_translators.register(b'DirectoryNotEmpty',
4510                                      lambda err: errors.DirectoryNotEmpty(err.error_args[0].decode('utf-8')))
4511no_context_error_translators.register(b'UnknownFormat',
4512                                      lambda err: errors.UnknownFormatError(
4513                                          err.error_args[0].decode('ascii'), err.error_args[0].decode('ascii')))
4514no_context_error_translators.register(b'InvalidURL',
4515                                      lambda err: urlutils.InvalidURL(
4516                                          err.error_args[0].decode('utf-8'), err.error_args[1].decode('ascii')))
4517
4518
4519def _translate_short_readv_error(err):
4520    args = err.error_args
4521    return errors.ShortReadvError(
4522        args[0].decode('utf-8'),
4523        int(args[1].decode('ascii')), int(args[2].decode('ascii')),
4524        int(args[3].decode('ascii')))
4525
4526
4527no_context_error_translators.register(b'ShortReadvError',
4528                                      _translate_short_readv_error)
4529
4530
4531def _translate_unicode_error(err):
4532    encoding = err.error_args[0].decode('ascii')
4533    val = err.error_args[1].decode('utf-8')
4534    start = int(err.error_args[2].decode('ascii'))
4535    end = int(err.error_args[3].decode('ascii'))
4536    reason = err.error_args[4].decode('utf-8')
4537    if val.startswith('u:'):
4538        val = val[2:].decode('utf-8')
4539    elif val.startswith('s:'):
4540        val = val[2:].decode('base64')
4541    if err.error_verb == 'UnicodeDecodeError':
4542        raise UnicodeDecodeError(encoding, val, start, end, reason)
4543    elif err.error_verb == 'UnicodeEncodeError':
4544        raise UnicodeEncodeError(encoding, val, start, end, reason)
4545
4546
4547no_context_error_translators.register(b'UnicodeEncodeError',
4548                                      _translate_unicode_error)
4549no_context_error_translators.register(b'UnicodeDecodeError',
4550                                      _translate_unicode_error)
4551no_context_error_translators.register(b'ReadOnlyError',
4552                                      lambda err: errors.TransportNotPossible('readonly transport'))
4553no_context_error_translators.register(b'MemoryError',
4554                                      lambda err: errors.BzrError("remote server out of memory\n"
4555                                                                  "Retry non-remotely, or contact the server admin for details."))
4556no_context_error_translators.register(b'RevisionNotPresent',
4557                                      lambda err: errors.RevisionNotPresent(err.error_args[0].decode('utf-8'), err.error_args[1].decode('utf-8')))
4558
4559no_context_error_translators.register(b'BzrCheckError',
4560                                      lambda err: errors.BzrCheckError(msg=err.error_args[0].decode('utf-8')))
4561