1# Copyright (C) 2005-2010 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Weave-era working tree objects."""
18
19from io import BytesIO
20
21from ... import (
22    conflicts as _mod_conflicts,
23    errors,
24    lock,
25    osutils,
26    revision as _mod_revision,
27    )
28from ...bzr import (
29    conflicts as _mod_bzr_conflicts,
30    inventory,
31    transform as bzr_transform,
32    xml5,
33    )
34from ...mutabletree import MutableTree
35from ...transport.local import LocalTransport
36from ...workingtree import (
37    WorkingTreeFormat,
38    )
39from ...bzr.workingtree_3 import (
40    PreDirStateWorkingTree,
41    )
42
43
44def get_conflicted_stem(path):
45    for suffix in _mod_bzr_conflicts.CONFLICT_SUFFIXES:
46        if path.endswith(suffix):
47            return path[:-len(suffix)]
48
49
50class WorkingTreeFormat2(WorkingTreeFormat):
51    """The second working tree format.
52
53    This format modified the hash cache from the format 1 hash cache.
54    """
55
56    upgrade_recommended = True
57
58    requires_normalized_unicode_filenames = True
59
60    case_sensitive_filename = "Branch-FoRMaT"
61
62    missing_parent_conflicts = False
63
64    supports_versioned_directories = True
65
66    ignore_filename = '.bzrignore'
67
68    def get_format_description(self):
69        """See WorkingTreeFormat.get_format_description()."""
70        return "Working tree format 2"
71
72    def _stub_initialize_on_transport(self, transport, file_mode):
73        """Workaround: create control files for a remote working tree.
74
75        This ensures that it can later be updated and dealt with locally,
76        since BzrDirFormat6 and BzrDirFormat5 cannot represent dirs with
77        no working tree.  (See bug #43064).
78        """
79        sio = BytesIO()
80        inv = inventory.Inventory()
81        xml5.serializer_v5.write_inventory(inv, sio, working=True)
82        sio.seek(0)
83        transport.put_file('inventory', sio, file_mode)
84        transport.put_bytes('pending-merges', b'', file_mode)
85
86    def initialize(self, a_controldir, revision_id=None, from_branch=None,
87                   accelerator_tree=None, hardlink=False):
88        """See WorkingTreeFormat.initialize()."""
89        if not isinstance(a_controldir.transport, LocalTransport):
90            raise errors.NotLocalUrl(a_controldir.transport.base)
91        if from_branch is not None:
92            branch = from_branch
93        else:
94            branch = a_controldir.open_branch()
95        if revision_id is None:
96            revision_id = _mod_revision.ensure_null(branch.last_revision())
97        with branch.lock_write():
98            branch.generate_revision_history(revision_id)
99        inv = inventory.Inventory()
100        wt = WorkingTree2(a_controldir.root_transport.local_abspath('.'),
101                          branch,
102                          inv,
103                          _internal=True,
104                          _format=self,
105                          _controldir=a_controldir,
106                          _control_files=branch.control_files)
107        basis_tree = branch.repository.revision_tree(revision_id)
108        if basis_tree.path2id('') is not None:
109            wt.set_root_id(basis_tree.path2id(''))
110        # set the parent list and cache the basis tree.
111        if _mod_revision.is_null(revision_id):
112            parent_trees = []
113        else:
114            parent_trees = [(revision_id, basis_tree)]
115        wt.set_parent_trees(parent_trees)
116        bzr_transform.build_tree(basis_tree, wt)
117        for hook in MutableTree.hooks['post_build_tree']:
118            hook(wt)
119        return wt
120
121    def __init__(self):
122        super(WorkingTreeFormat2, self).__init__()
123        from breezy.plugins.weave_fmt.bzrdir import BzrDirFormat6
124        self._matchingcontroldir = BzrDirFormat6()
125
126    def open(self, a_controldir, _found=False):
127        """Return the WorkingTree object for a_controldir
128
129        _found is a private parameter, do not use it. It is used to indicate
130               if format probing has already been done.
131        """
132        if not _found:
133            # we are being called directly and must probe.
134            raise NotImplementedError
135        if not isinstance(a_controldir.transport, LocalTransport):
136            raise errors.NotLocalUrl(a_controldir.transport.base)
137        wt = WorkingTree2(a_controldir.root_transport.local_abspath('.'),
138                          _internal=True,
139                          _format=self,
140                          _controldir=a_controldir,
141                          _control_files=a_controldir.open_branch().control_files)
142        return wt
143
144
145class WorkingTree2(PreDirStateWorkingTree):
146    """This is the Format 2 working tree.
147
148    This was the first weave based working tree.
149     - uses os locks for locking.
150     - uses the branch last-revision.
151    """
152
153    def __init__(self, basedir, *args, **kwargs):
154        super(WorkingTree2, self).__init__(basedir, *args, **kwargs)
155        # WorkingTree2 has more of a constraint that self._inventory must
156        # exist. Because this is an older format, we don't mind the overhead
157        # caused by the extra computation here.
158
159        # Newer WorkingTree's should only have self._inventory set when they
160        # have a read lock.
161        if self._inventory is None:
162            self.read_working_inventory()
163
164    def _get_check_refs(self):
165        """Return the references needed to perform a check of this tree."""
166        return [('trees', self.last_revision())]
167
168    def lock_tree_write(self):
169        """See WorkingTree.lock_tree_write().
170
171        In Format2 WorkingTrees we have a single lock for the branch and tree
172        so lock_tree_write() degrades to lock_write().
173
174        :return: An object with an unlock method which will release the lock
175            obtained.
176        """
177        self.branch.lock_write()
178        try:
179            token = self._control_files.lock_write()
180            return lock.LogicalLockResult(self.unlock, token)
181        except:
182            self.branch.unlock()
183            raise
184
185    def unlock(self):
186        # we share control files:
187        if self._control_files._lock_count == 3:
188            # do non-implementation specific cleanup
189            self._cleanup()
190            # _inventory_is_modified is always False during a read lock.
191            if self._inventory_is_modified:
192                self.flush()
193            self._write_hashcache_if_dirty()
194
195        # reverse order of locking.
196        try:
197            return self._control_files.unlock()
198        finally:
199            self.branch.unlock()
200
201    def _iter_conflicts(self):
202        conflicted = set()
203        for path, file_class, file_kind, entry in self.list_files():
204            stem = get_conflicted_stem(path)
205            if stem is None:
206                continue
207            if stem not in conflicted:
208                conflicted.add(stem)
209                yield stem
210
211    def conflicts(self):
212        with self.lock_read():
213            conflicts = _mod_conflicts.ConflictList()
214            for conflicted in self._iter_conflicts():
215                text = True
216                try:
217                    if osutils.file_kind(self.abspath(conflicted)) != "file":
218                        text = False
219                except errors.NoSuchFile:
220                    text = False
221                if text is True:
222                    for suffix in ('.THIS', '.OTHER'):
223                        try:
224                            kind = osutils.file_kind(
225                                self.abspath(conflicted + suffix))
226                            if kind != "file":
227                                text = False
228                        except errors.NoSuchFile:
229                            text = False
230                        if text is False:
231                            break
232                ctype = {True: 'text conflict',
233                         False: 'contents conflict'}[text]
234                conflicts.append(_mod_bzr_conflicts.Conflict.factory(ctype,
235                                                                 path=conflicted,
236                                                                 file_id=self.path2id(conflicted)))
237            return conflicts
238
239    def set_conflicts(self, arg):
240        raise errors.UnsupportedOperation(self.set_conflicts, self)
241
242    def add_conflicts(self, arg):
243        raise errors.UnsupportedOperation(self.add_conflicts, self)
244