1# Copyright (C) 2005-2010 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Weave-era working tree objects.""" 18 19from io import BytesIO 20 21from ... import ( 22 conflicts as _mod_conflicts, 23 errors, 24 lock, 25 osutils, 26 revision as _mod_revision, 27 ) 28from ...bzr import ( 29 conflicts as _mod_bzr_conflicts, 30 inventory, 31 transform as bzr_transform, 32 xml5, 33 ) 34from ...mutabletree import MutableTree 35from ...transport.local import LocalTransport 36from ...workingtree import ( 37 WorkingTreeFormat, 38 ) 39from ...bzr.workingtree_3 import ( 40 PreDirStateWorkingTree, 41 ) 42 43 44def get_conflicted_stem(path): 45 for suffix in _mod_bzr_conflicts.CONFLICT_SUFFIXES: 46 if path.endswith(suffix): 47 return path[:-len(suffix)] 48 49 50class WorkingTreeFormat2(WorkingTreeFormat): 51 """The second working tree format. 52 53 This format modified the hash cache from the format 1 hash cache. 54 """ 55 56 upgrade_recommended = True 57 58 requires_normalized_unicode_filenames = True 59 60 case_sensitive_filename = "Branch-FoRMaT" 61 62 missing_parent_conflicts = False 63 64 supports_versioned_directories = True 65 66 ignore_filename = '.bzrignore' 67 68 def get_format_description(self): 69 """See WorkingTreeFormat.get_format_description().""" 70 return "Working tree format 2" 71 72 def _stub_initialize_on_transport(self, transport, file_mode): 73 """Workaround: create control files for a remote working tree. 74 75 This ensures that it can later be updated and dealt with locally, 76 since BzrDirFormat6 and BzrDirFormat5 cannot represent dirs with 77 no working tree. (See bug #43064). 78 """ 79 sio = BytesIO() 80 inv = inventory.Inventory() 81 xml5.serializer_v5.write_inventory(inv, sio, working=True) 82 sio.seek(0) 83 transport.put_file('inventory', sio, file_mode) 84 transport.put_bytes('pending-merges', b'', file_mode) 85 86 def initialize(self, a_controldir, revision_id=None, from_branch=None, 87 accelerator_tree=None, hardlink=False): 88 """See WorkingTreeFormat.initialize().""" 89 if not isinstance(a_controldir.transport, LocalTransport): 90 raise errors.NotLocalUrl(a_controldir.transport.base) 91 if from_branch is not None: 92 branch = from_branch 93 else: 94 branch = a_controldir.open_branch() 95 if revision_id is None: 96 revision_id = _mod_revision.ensure_null(branch.last_revision()) 97 with branch.lock_write(): 98 branch.generate_revision_history(revision_id) 99 inv = inventory.Inventory() 100 wt = WorkingTree2(a_controldir.root_transport.local_abspath('.'), 101 branch, 102 inv, 103 _internal=True, 104 _format=self, 105 _controldir=a_controldir, 106 _control_files=branch.control_files) 107 basis_tree = branch.repository.revision_tree(revision_id) 108 if basis_tree.path2id('') is not None: 109 wt.set_root_id(basis_tree.path2id('')) 110 # set the parent list and cache the basis tree. 111 if _mod_revision.is_null(revision_id): 112 parent_trees = [] 113 else: 114 parent_trees = [(revision_id, basis_tree)] 115 wt.set_parent_trees(parent_trees) 116 bzr_transform.build_tree(basis_tree, wt) 117 for hook in MutableTree.hooks['post_build_tree']: 118 hook(wt) 119 return wt 120 121 def __init__(self): 122 super(WorkingTreeFormat2, self).__init__() 123 from breezy.plugins.weave_fmt.bzrdir import BzrDirFormat6 124 self._matchingcontroldir = BzrDirFormat6() 125 126 def open(self, a_controldir, _found=False): 127 """Return the WorkingTree object for a_controldir 128 129 _found is a private parameter, do not use it. It is used to indicate 130 if format probing has already been done. 131 """ 132 if not _found: 133 # we are being called directly and must probe. 134 raise NotImplementedError 135 if not isinstance(a_controldir.transport, LocalTransport): 136 raise errors.NotLocalUrl(a_controldir.transport.base) 137 wt = WorkingTree2(a_controldir.root_transport.local_abspath('.'), 138 _internal=True, 139 _format=self, 140 _controldir=a_controldir, 141 _control_files=a_controldir.open_branch().control_files) 142 return wt 143 144 145class WorkingTree2(PreDirStateWorkingTree): 146 """This is the Format 2 working tree. 147 148 This was the first weave based working tree. 149 - uses os locks for locking. 150 - uses the branch last-revision. 151 """ 152 153 def __init__(self, basedir, *args, **kwargs): 154 super(WorkingTree2, self).__init__(basedir, *args, **kwargs) 155 # WorkingTree2 has more of a constraint that self._inventory must 156 # exist. Because this is an older format, we don't mind the overhead 157 # caused by the extra computation here. 158 159 # Newer WorkingTree's should only have self._inventory set when they 160 # have a read lock. 161 if self._inventory is None: 162 self.read_working_inventory() 163 164 def _get_check_refs(self): 165 """Return the references needed to perform a check of this tree.""" 166 return [('trees', self.last_revision())] 167 168 def lock_tree_write(self): 169 """See WorkingTree.lock_tree_write(). 170 171 In Format2 WorkingTrees we have a single lock for the branch and tree 172 so lock_tree_write() degrades to lock_write(). 173 174 :return: An object with an unlock method which will release the lock 175 obtained. 176 """ 177 self.branch.lock_write() 178 try: 179 token = self._control_files.lock_write() 180 return lock.LogicalLockResult(self.unlock, token) 181 except: 182 self.branch.unlock() 183 raise 184 185 def unlock(self): 186 # we share control files: 187 if self._control_files._lock_count == 3: 188 # do non-implementation specific cleanup 189 self._cleanup() 190 # _inventory_is_modified is always False during a read lock. 191 if self._inventory_is_modified: 192 self.flush() 193 self._write_hashcache_if_dirty() 194 195 # reverse order of locking. 196 try: 197 return self._control_files.unlock() 198 finally: 199 self.branch.unlock() 200 201 def _iter_conflicts(self): 202 conflicted = set() 203 for path, file_class, file_kind, entry in self.list_files(): 204 stem = get_conflicted_stem(path) 205 if stem is None: 206 continue 207 if stem not in conflicted: 208 conflicted.add(stem) 209 yield stem 210 211 def conflicts(self): 212 with self.lock_read(): 213 conflicts = _mod_conflicts.ConflictList() 214 for conflicted in self._iter_conflicts(): 215 text = True 216 try: 217 if osutils.file_kind(self.abspath(conflicted)) != "file": 218 text = False 219 except errors.NoSuchFile: 220 text = False 221 if text is True: 222 for suffix in ('.THIS', '.OTHER'): 223 try: 224 kind = osutils.file_kind( 225 self.abspath(conflicted + suffix)) 226 if kind != "file": 227 text = False 228 except errors.NoSuchFile: 229 text = False 230 if text is False: 231 break 232 ctype = {True: 'text conflict', 233 False: 'contents conflict'}[text] 234 conflicts.append(_mod_bzr_conflicts.Conflict.factory(ctype, 235 path=conflicted, 236 file_id=self.path2id(conflicted))) 237 return conflicts 238 239 def set_conflicts(self, arg): 240 raise errors.UnsupportedOperation(self.set_conflicts, self) 241 242 def add_conflicts(self, arg): 243 raise errors.UnsupportedOperation(self.add_conflicts, self) 244