1# Copyright (C) 2005-2011 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Copying of history from one branch to another. 18 19The basic plan is that every branch knows the history of everything 20that has merged into it. As the first step of a merge, pull, or 21branch operation we copy history from the source into the destination 22branch. 23""" 24 25import operator 26 27from ..lazy_import import lazy_import 28lazy_import(globals(), """ 29from breezy import ( 30 tsort, 31 ) 32from breezy.bzr import ( 33 versionedfile, 34 vf_search, 35 ) 36""") 37from .. import ( 38 errors, 39 ui, 40 ) 41from ..i18n import gettext 42from ..revision import NULL_REVISION 43from ..trace import mutter 44 45 46class RepoFetcher(object): 47 """Pull revisions and texts from one repository to another. 48 49 This should not be used directly, it's essential a object to encapsulate 50 the logic in InterRepository.fetch(). 51 """ 52 53 def __init__(self, to_repository, from_repository, last_revision=None, 54 find_ghosts=True, fetch_spec=None): 55 """Create a repo fetcher. 56 57 :param last_revision: If set, try to limit to the data this revision 58 references. 59 :param fetch_spec: A SearchResult specifying which revisions to fetch. 60 If set, this overrides last_revision. 61 :param find_ghosts: If True search the entire history for ghosts. 62 """ 63 # repository.fetch has the responsibility for short-circuiting 64 # attempts to copy between a repository and itself. 65 self.to_repository = to_repository 66 self.from_repository = from_repository 67 self.sink = to_repository._get_sink() 68 # must not mutate self._last_revision as its potentially a shared instance 69 self._last_revision = last_revision 70 self._fetch_spec = fetch_spec 71 self.find_ghosts = find_ghosts 72 with self.from_repository.lock_read(): 73 mutter("Using fetch logic to copy between %s(%s) and %s(%s)", 74 str(self.from_repository), str(self.from_repository._format), 75 str(self.to_repository), str(self.to_repository._format)) 76 self.__fetch() 77 78 def __fetch(self): 79 """Primary worker function. 80 81 This initialises all the needed variables, and then fetches the 82 requested revisions, finally clearing the progress bar. 83 """ 84 # Roughly this is what we're aiming for fetch to become: 85 # 86 # missing = self.sink.insert_stream(self.source.get_stream(search)) 87 # if missing: 88 # missing = self.sink.insert_stream(self.source.get_items(missing)) 89 # assert not missing 90 self.count_total = 0 91 self.file_ids_names = {} 92 with ui.ui_factory.nested_progress_bar() as pb: 93 pb.show_pct = pb.show_count = False 94 pb.update(gettext("Finding revisions"), 0, 2) 95 search_result = self._revids_to_fetch() 96 mutter('fetching: %s', str(search_result)) 97 if search_result.is_empty(): 98 return 99 pb.update(gettext("Fetching revisions"), 1, 2) 100 self._fetch_everything_for_search(search_result) 101 102 def _fetch_everything_for_search(self, search): 103 """Fetch all data for the given set of revisions.""" 104 # The first phase is "file". We pass the progress bar for it directly 105 # into item_keys_introduced_by, which has more information about how 106 # that phase is progressing than we do. Progress updates for the other 107 # phases are taken care of in this function. 108 # XXX: there should be a clear owner of the progress reporting. Perhaps 109 # item_keys_introduced_by should have a richer API than it does at the 110 # moment, so that it can feed the progress information back to this 111 # function? 112 if (self.from_repository._format.rich_root_data and 113 not self.to_repository._format.rich_root_data): 114 raise errors.IncompatibleRepositories( 115 self.from_repository, self.to_repository, 116 "different rich-root support") 117 with ui.ui_factory.nested_progress_bar() as pb: 118 pb.update("Get stream source") 119 source = self.from_repository._get_source( 120 self.to_repository._format) 121 stream = source.get_stream(search) 122 from_format = self.from_repository._format 123 pb.update("Inserting stream") 124 resume_tokens, missing_keys = self.sink.insert_stream( 125 stream, from_format, []) 126 if missing_keys: 127 pb.update("Missing keys") 128 stream = source.get_stream_for_missing_keys(missing_keys) 129 pb.update("Inserting missing keys") 130 resume_tokens, missing_keys = self.sink.insert_stream( 131 stream, from_format, resume_tokens) 132 if missing_keys: 133 raise AssertionError( 134 "second push failed to complete a fetch %r." % ( 135 missing_keys,)) 136 if resume_tokens: 137 raise AssertionError( 138 "second push failed to commit the fetch %r." % ( 139 resume_tokens,)) 140 pb.update("Finishing stream") 141 self.sink.finished() 142 143 def _revids_to_fetch(self): 144 """Determines the exact revisions needed from self.from_repository to 145 install self._last_revision in self.to_repository. 146 147 :returns: A SearchResult of some sort. (Possibly a 148 PendingAncestryResult, EmptySearchResult, etc.) 149 """ 150 if self._fetch_spec is not None: 151 # The fetch spec is already a concrete search result. 152 return self._fetch_spec 153 elif self._last_revision == NULL_REVISION: 154 # fetch_spec is None + last_revision is null => empty fetch. 155 # explicit limit of no revisions needed 156 return vf_search.EmptySearchResult() 157 elif self._last_revision is not None: 158 return vf_search.NotInOtherForRevs(self.to_repository, 159 self.from_repository, [ 160 self._last_revision], 161 find_ghosts=self.find_ghosts).execute() 162 else: # self._last_revision is None: 163 return vf_search.EverythingNotInOther(self.to_repository, 164 self.from_repository, 165 find_ghosts=self.find_ghosts).execute() 166 167 168class Inter1and2Helper(object): 169 """Helper for operations that convert data from model 1 and 2 170 171 This is for use by fetchers and converters. 172 """ 173 174 # This is a class variable so that the test suite can override it. 175 known_graph_threshold = 100 176 177 def __init__(self, source): 178 """Constructor. 179 180 :param source: The repository data comes from 181 """ 182 self.source = source 183 184 def iter_rev_trees(self, revs): 185 """Iterate through RevisionTrees efficiently. 186 187 Additionally, the inventory's revision_id is set if unset. 188 189 Trees are retrieved in batches of 100, and then yielded in the order 190 they were requested. 191 192 :param revs: A list of revision ids 193 """ 194 # In case that revs is not a list. 195 revs = list(revs) 196 while revs: 197 for tree in self.source.revision_trees(revs[:100]): 198 if tree.root_inventory.revision_id is None: 199 tree.root_inventory.revision_id = tree.get_revision_id() 200 yield tree 201 revs = revs[100:] 202 203 def _find_root_ids(self, revs, parent_map, graph): 204 revision_root = {} 205 for tree in self.iter_rev_trees(revs): 206 root_id = tree.path2id('') 207 revision_id = tree.get_file_revision(u'') 208 revision_root[revision_id] = root_id 209 # Find out which parents we don't already know root ids for 210 parents = set(parent_map.values()) 211 parents.difference_update(revision_root) 212 parents.discard(NULL_REVISION) 213 # Limit to revisions present in the versionedfile 214 parents = graph.get_parent_map(parents) 215 for tree in self.iter_rev_trees(parents): 216 root_id = tree.path2id('') 217 revision_root[tree.get_revision_id()] = root_id 218 return revision_root 219 220 def generate_root_texts(self, revs): 221 """Generate VersionedFiles for all root ids. 222 223 :param revs: the revisions to include 224 """ 225 graph = self.source.get_graph() 226 parent_map = graph.get_parent_map(revs) 227 rev_order = tsort.topo_sort(parent_map) 228 rev_id_to_root_id = self._find_root_ids(revs, parent_map, graph) 229 root_id_order = [(rev_id_to_root_id[rev_id], rev_id) for rev_id in 230 rev_order] 231 # Guaranteed stable, this groups all the file id operations together 232 # retaining topological order within the revisions of a file id. 233 # File id splits and joins would invalidate this, but they don't exist 234 # yet, and are unlikely to in non-rich-root environments anyway. 235 root_id_order.sort(key=operator.itemgetter(0)) 236 # Create a record stream containing the roots to create. 237 if len(revs) > self.known_graph_threshold: 238 graph = self.source.get_known_graph_ancestry(revs) 239 new_roots_stream = _new_root_data_stream( 240 root_id_order, rev_id_to_root_id, parent_map, self.source, graph) 241 return [('texts', new_roots_stream)] 242 243 244def _new_root_data_stream( 245 root_keys_to_create, rev_id_to_root_id_map, parent_map, repo, graph=None): 246 """Generate a texts substream of synthesised root entries. 247 248 Used in fetches that do rich-root upgrades. 249 250 :param root_keys_to_create: iterable of (root_id, rev_id) pairs describing 251 the root entries to create. 252 :param rev_id_to_root_id_map: dict of known rev_id -> root_id mappings for 253 calculating the parents. If a parent rev_id is not found here then it 254 will be recalculated. 255 :param parent_map: a parent map for all the revisions in 256 root_keys_to_create. 257 :param graph: a graph to use instead of repo.get_graph(). 258 """ 259 for root_key in root_keys_to_create: 260 root_id, rev_id = root_key 261 parent_keys = _parent_keys_for_root_version( 262 root_id, rev_id, rev_id_to_root_id_map, parent_map, repo, graph) 263 yield versionedfile.ChunkedContentFactory( 264 root_key, parent_keys, None, []) 265 266 267def _parent_keys_for_root_version( 268 root_id, rev_id, rev_id_to_root_id_map, parent_map, repo, graph=None): 269 """Get the parent keys for a given root id. 270 271 A helper function for _new_root_data_stream. 272 """ 273 # Include direct parents of the revision, but only if they used the same 274 # root_id and are heads. 275 rev_parents = parent_map[rev_id] 276 parent_ids = [] 277 for parent_id in rev_parents: 278 if parent_id == NULL_REVISION: 279 continue 280 if parent_id not in rev_id_to_root_id_map: 281 # We probably didn't read this revision, go spend the extra effort 282 # to actually check 283 try: 284 tree = repo.revision_tree(parent_id) 285 except errors.NoSuchRevision: 286 # Ghost, fill out rev_id_to_root_id in case we encounter this 287 # again. 288 # But set parent_root_id to None since we don't really know 289 parent_root_id = None 290 else: 291 parent_root_id = tree.path2id('') 292 rev_id_to_root_id_map[parent_id] = None 293 # XXX: why not: 294 # rev_id_to_root_id_map[parent_id] = parent_root_id 295 # memory consumption maybe? 296 else: 297 parent_root_id = rev_id_to_root_id_map[parent_id] 298 if root_id == parent_root_id: 299 # With stacking we _might_ want to refer to a non-local revision, 300 # but this code path only applies when we have the full content 301 # available, so ghosts really are ghosts, not just the edge of 302 # local data. 303 parent_ids.append(parent_id) 304 else: 305 # root_id may be in the parent anyway. 306 try: 307 tree = repo.revision_tree(parent_id) 308 except errors.NoSuchRevision: 309 # ghost, can't refer to it. 310 pass 311 else: 312 try: 313 parent_ids.append( 314 tree.get_file_revision( 315 tree.id2path(root_id, recurse='none'))) 316 except errors.NoSuchId: 317 # not in the tree 318 pass 319 # Drop non-head parents 320 if graph is None: 321 graph = repo.get_graph() 322 heads = graph.heads(parent_ids) 323 selected_ids = [] 324 for parent_id in parent_ids: 325 if parent_id in heads and parent_id not in selected_ids: 326 selected_ids.append(parent_id) 327 parent_keys = [(root_id, parent_id) for parent_id in selected_ids] 328 return parent_keys 329 330 331class TargetRepoKinds(object): 332 """An enum-like set of constants. 333 334 They are the possible values of FetchSpecFactory.target_repo_kinds. 335 """ 336 337 PREEXISTING = 'preexisting' 338 STACKED = 'stacked' 339 EMPTY = 'empty' 340 341 342class FetchSpecFactory(object): 343 """A helper for building the best fetch spec for a sprout call. 344 345 Factors that go into determining the sort of fetch to perform: 346 * did the caller specify any revision IDs? 347 * did the caller specify a source branch (need to fetch its 348 heads_to_fetch(), usually the tip + tags) 349 * is there an existing target repo (don't need to refetch revs it 350 already has) 351 * target is stacked? (similar to pre-existing target repo: even if 352 the target itself is new don't want to refetch existing revs) 353 354 :ivar source_branch: the source branch if one specified, else None. 355 :ivar source_branch_stop_revision_id: fetch up to this revision of 356 source_branch, rather than its tip. 357 :ivar source_repo: the source repository if one found, else None. 358 :ivar target_repo: the target repository acquired by sprout. 359 :ivar target_repo_kind: one of the TargetRepoKinds constants. 360 """ 361 362 def __init__(self): 363 self._explicit_rev_ids = set() 364 self.source_branch = None 365 self.source_branch_stop_revision_id = None 366 self.source_repo = None 367 self.target_repo = None 368 self.target_repo_kind = None 369 self.limit = None 370 371 def add_revision_ids(self, revision_ids): 372 """Add revision_ids to the set of revision_ids to be fetched.""" 373 self._explicit_rev_ids.update(revision_ids) 374 375 def make_fetch_spec(self): 376 """Build a SearchResult or PendingAncestryResult or etc.""" 377 if self.target_repo_kind is None or self.source_repo is None: 378 raise AssertionError( 379 'Incomplete FetchSpecFactory: %r' % (self.__dict__,)) 380 if len(self._explicit_rev_ids) == 0 and self.source_branch is None: 381 if self.limit is not None: 382 raise NotImplementedError( 383 "limit is only supported with a source branch set") 384 # Caller hasn't specified any revisions or source branch 385 if self.target_repo_kind == TargetRepoKinds.EMPTY: 386 return vf_search.EverythingResult(self.source_repo) 387 else: 388 # We want everything not already in the target (or target's 389 # fallbacks). 390 return vf_search.EverythingNotInOther( 391 self.target_repo, self.source_repo).execute() 392 heads_to_fetch = set(self._explicit_rev_ids) 393 if self.source_branch is not None: 394 must_fetch, if_present_fetch = self.source_branch.heads_to_fetch() 395 if self.source_branch_stop_revision_id is not None: 396 # Replace the tip rev from must_fetch with the stop revision 397 # XXX: this might be wrong if the tip rev is also in the 398 # must_fetch set for other reasons (e.g. it's the tip of 399 # multiple loom threads?), but then it's pretty unclear what it 400 # should mean to specify a stop_revision in that case anyway. 401 must_fetch.discard(self.source_branch.last_revision()) 402 must_fetch.add(self.source_branch_stop_revision_id) 403 heads_to_fetch.update(must_fetch) 404 else: 405 if_present_fetch = set() 406 if self.target_repo_kind == TargetRepoKinds.EMPTY: 407 # PendingAncestryResult does not raise errors if a requested head 408 # is absent. Ideally it would support the 409 # required_ids/if_present_ids distinction, but in practice 410 # heads_to_fetch will almost certainly be present so this doesn't 411 # matter much. 412 all_heads = heads_to_fetch.union(if_present_fetch) 413 ret = vf_search.PendingAncestryResult(all_heads, self.source_repo) 414 if self.limit is not None: 415 graph = self.source_repo.get_graph() 416 topo_order = list(graph.iter_topo_order(ret.get_keys())) 417 result_set = topo_order[:self.limit] 418 ret = self.source_repo.revision_ids_to_search_result( 419 result_set) 420 return ret 421 else: 422 return vf_search.NotInOtherForRevs(self.target_repo, self.source_repo, 423 required_ids=heads_to_fetch, if_present_ids=if_present_fetch, 424 limit=self.limit).execute() 425