1# Copyright (C) 2008 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program. If not, see <http://www.gnu.org/licenses/>. 15 16"""Import processor that supports all Bazaar repository formats.""" 17 18 19import time 20from .... import ( 21 debug, 22 delta, 23 errors, 24 osutils, 25 progress, 26 revision as _mod_revision, 27 ) 28from ....bzr.knitpack_repo import KnitPackRepository 29from ....trace import ( 30 mutter, 31 note, 32 warning, 33 ) 34import configobj 35from .. import ( 36 branch_updater, 37 cache_manager, 38 helpers, 39 idmapfile, 40 marks_file, 41 revision_store, 42 ) 43from fastimport import ( 44 commands, 45 errors as plugin_errors, 46 processor, 47 ) 48from fastimport.helpers import ( 49 invert_dictset, 50 ) 51 52 53# How many commits before automatically reporting progress 54_DEFAULT_AUTO_PROGRESS = 1000 55 56# How many commits before automatically checkpointing 57_DEFAULT_AUTO_CHECKPOINT = 10000 58 59# How many checkpoints before automatically packing 60_DEFAULT_AUTO_PACK = 4 61 62# How many inventories to cache 63_DEFAULT_INV_CACHE_SIZE = 1 64_DEFAULT_CHK_INV_CACHE_SIZE = 1 65 66 67class GenericProcessor(processor.ImportProcessor): 68 """An import processor that handles basic imports. 69 70 Current features supported: 71 72 * blobs are cached in memory 73 * files and symlinks commits are supported 74 * checkpoints automatically happen at a configurable frequency 75 over and above the stream requested checkpoints 76 * timestamped progress reporting, both automatic and stream requested 77 * some basic statistics are dumped on completion. 78 79 At checkpoints and on completion, the commit-id -> revision-id map is 80 saved to a file called 'fastimport-id-map'. If the import crashes 81 or is interrupted, it can be started again and this file will be 82 used to skip over already loaded revisions. The format of each line 83 is "commit-id revision-id" so commit-ids cannot include spaces. 84 85 Here are the supported parameters: 86 87 * info - name of a hints file holding the analysis generated 88 by running the fast-import-info processor in verbose mode. When 89 importing large repositories, this parameter is needed so 90 that the importer knows what blobs to intelligently cache. 91 92 * trees - update the working trees before completing. 93 By default, the importer updates the repository 94 and branches and the user needs to run 'bzr update' for the 95 branches of interest afterwards. 96 97 * count - only import this many commits then exit. If not set 98 or negative, all commits are imported. 99 100 * checkpoint - automatically checkpoint every n commits over and 101 above any checkpoints contained in the import stream. 102 The default is 10000. 103 104 * autopack - pack every n checkpoints. The default is 4. 105 106 * inv-cache - number of inventories to cache. 107 If not set, the default is 1. 108 109 * mode - import algorithm to use: default or experimental. 110 111 * import-marks - name of file to read to load mark information from 112 113 * export-marks - name of file to write to save mark information to 114 """ 115 116 known_params = [ 117 'info', 118 'trees', 119 'count', 120 'checkpoint', 121 'autopack', 122 'inv-cache', 123 'mode', 124 'import-marks', 125 'export-marks', 126 ] 127 128 def __init__(self, bzrdir, params=None, verbose=False, outf=None, 129 prune_empty_dirs=True): 130 processor.ImportProcessor.__init__(self, params, verbose) 131 self.prune_empty_dirs = prune_empty_dirs 132 self.controldir = bzrdir 133 try: 134 # Might be inside a branch 135 (self.working_tree, self.branch) = bzrdir._get_tree_branch() 136 self.repo = self.branch.repository 137 except errors.NotBranchError: 138 # Must be inside a repository 139 self.working_tree = None 140 self.branch = None 141 self.repo = bzrdir.open_repository() 142 143 def pre_process(self): 144 self._start_time = time.time() 145 self._load_info_and_params() 146 if self.total_commits: 147 self.note("Starting import of %d commits ..." % 148 (self.total_commits,)) 149 else: 150 self.note("Starting import ...") 151 self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose, 152 self.inventory_cache_size) 153 154 if self.params.get("import-marks") is not None: 155 mark_info = marks_file.import_marks( 156 self.params.get("import-marks")) 157 if mark_info is not None: 158 self.cache_mgr.marks = mark_info 159 self.skip_total = False 160 self.first_incremental_commit = True 161 else: 162 self.first_incremental_commit = False 163 self.skip_total = self._init_id_map() 164 if self.skip_total: 165 self.note("Found %d commits already loaded - " 166 "skipping over these ...", self.skip_total) 167 self._revision_count = 0 168 169 # mapping of tag name to revision_id 170 self.tags = {} 171 172 # Create the revision store to use for committing, if any 173 self.rev_store = self._revision_store_factory() 174 175 # Disable autopacking if the repo format supports it. 176 # THIS IS A HACK - there is no sanctioned way of doing this yet. 177 if isinstance(self.repo, KnitPackRepository): 178 self._original_max_pack_count = \ 179 self.repo._pack_collection._max_pack_count 180 181 def _max_pack_count_for_import(total_revisions): 182 return total_revisions + 1 183 self.repo._pack_collection._max_pack_count = \ 184 _max_pack_count_for_import 185 else: 186 self._original_max_pack_count = None 187 188 # Make groupcompress use the fast algorithm during importing. 189 # We want to repack at the end anyhow when more information 190 # is available to do a better job of saving space. 191 try: 192 from .... import groupcompress 193 groupcompress._FAST = True 194 except ImportError: 195 pass 196 197 # Create a write group. This is committed at the end of the import. 198 # Checkpointing closes the current one and starts a new one. 199 self.repo.start_write_group() 200 201 def _load_info_and_params(self): 202 from .. import bzr_commit_handler 203 # This is currently hard-coded but might be configurable via 204 # parameters one day if that's needed 205 repo_transport = self.repo.control_transport 206 self.id_map_path = repo_transport.local_abspath("fastimport-id-map") 207 208 # Load the info file, if any 209 info_path = self.params.get('info') 210 if info_path is not None: 211 self.info = configobj.ConfigObj(info_path) 212 else: 213 self.info = None 214 215 self.supports_chk = self.repo._format.supports_chks 216 self.commit_handler_factory = bzr_commit_handler.CommitHandler 217 218 # Decide how often to automatically report progress 219 # (not a parameter yet) 220 self.progress_every = _DEFAULT_AUTO_PROGRESS 221 if self.verbose: 222 self.progress_every = self.progress_every / 10 223 224 # Decide how often (# of commits) to automatically checkpoint 225 self.checkpoint_every = int(self.params.get('checkpoint', 226 _DEFAULT_AUTO_CHECKPOINT)) 227 228 # Decide how often (# of checkpoints) to automatically pack 229 self.checkpoint_count = 0 230 self.autopack_every = int(self.params.get('autopack', 231 _DEFAULT_AUTO_PACK)) 232 233 # Decide how big to make the inventory cache 234 cache_size = int(self.params.get('inv-cache', -1)) 235 if cache_size == -1: 236 if self.supports_chk: 237 cache_size = _DEFAULT_CHK_INV_CACHE_SIZE 238 else: 239 cache_size = _DEFAULT_INV_CACHE_SIZE 240 self.inventory_cache_size = cache_size 241 242 # Find the maximum number of commits to import (None means all) 243 # and prepare progress reporting. Just in case the info file 244 # has an outdated count of commits, we store the max counts 245 # at which we need to terminate separately to the total used 246 # for progress tracking. 247 try: 248 self.max_commits = int(self.params['count']) 249 if self.max_commits < 0: 250 self.max_commits = None 251 except KeyError: 252 self.max_commits = None 253 if self.info is not None: 254 self.total_commits = int(self.info['Command counts']['commit']) 255 if (self.max_commits is not None 256 and self.total_commits > self.max_commits): 257 self.total_commits = self.max_commits 258 else: 259 self.total_commits = self.max_commits 260 261 def _revision_store_factory(self): 262 """Make a RevisionStore based on what the repository supports.""" 263 return revision_store.RevisionStore(self.repo) 264 265 def process(self, command_iter): 266 """Import data into Bazaar by processing a stream of commands. 267 268 :param command_iter: an iterator providing commands 269 """ 270 if self.working_tree is not None: 271 self.working_tree.lock_write() 272 elif self.branch is not None: 273 self.branch.lock_write() 274 elif self.repo is not None: 275 self.repo.lock_write() 276 try: 277 super(GenericProcessor, self)._process(command_iter) 278 finally: 279 # If an unhandled exception occurred, abort the write group 280 if self.repo is not None and self.repo.is_in_write_group(): 281 self.repo.abort_write_group() 282 # Release the locks 283 if self.working_tree is not None: 284 self.working_tree.unlock() 285 elif self.branch is not None: 286 self.branch.unlock() 287 elif self.repo is not None: 288 self.repo.unlock() 289 290 def _process(self, command_iter): 291 # if anything goes wrong, abort the write group if any 292 try: 293 processor.ImportProcessor._process(self, command_iter) 294 except: 295 if self.repo is not None and self.repo.is_in_write_group(): 296 self.repo.abort_write_group() 297 raise 298 299 def post_process(self): 300 # Commit the current write group and checkpoint the id map 301 self.repo.commit_write_group() 302 self._save_id_map() 303 304 if self.params.get("export-marks") is not None: 305 marks_file.export_marks(self.params.get("export-marks"), 306 self.cache_mgr.marks) 307 308 if self.cache_mgr.reftracker.last_ref is None: 309 """Nothing to refresh""" 310 return 311 312 # Update the branches 313 self.note("Updating branch information ...") 314 updater = branch_updater.BranchUpdater(self.repo, self.branch, 315 self.cache_mgr, invert_dictset( 316 self.cache_mgr.reftracker.heads), 317 self.cache_mgr.reftracker.last_ref, self.tags) 318 branches_updated, branches_lost = updater.update() 319 self._branch_count = len(branches_updated) 320 321 # Tell the user about branches that were not created 322 if branches_lost: 323 if not self.repo.is_shared(): 324 self.warning("Cannot import multiple branches into " 325 "a standalone branch") 326 self.warning("Not creating branches for these head revisions:") 327 for lost_info in branches_lost: 328 head_revision = lost_info[1] 329 branch_name = lost_info[0] 330 self.note("\t %s = %s", head_revision, branch_name) 331 332 # Update the working trees as requested 333 self._tree_count = 0 334 remind_about_update = True 335 if self._branch_count == 0: 336 self.note("no branches to update") 337 self.note("no working trees to update") 338 remind_about_update = False 339 elif self.params.get('trees', False): 340 trees = self._get_working_trees(branches_updated) 341 if trees: 342 self._update_working_trees(trees) 343 remind_about_update = False 344 else: 345 self.warning("No working trees available to update") 346 else: 347 # Update just the trunk. (This is always the first branch 348 # returned by the branch updater.) 349 trunk_branch = branches_updated[0] 350 trees = self._get_working_trees([trunk_branch]) 351 if trees: 352 self._update_working_trees(trees) 353 remind_about_update = self._branch_count > 1 354 355 # Dump the cache stats now because we clear it before the final pack 356 if self.verbose: 357 self.cache_mgr.dump_stats() 358 if self._original_max_pack_count: 359 # We earlier disabled autopacking, creating one pack every 360 # checkpoint instead. We now pack the repository to optimise 361 # how data is stored. 362 self.cache_mgr.clear_all() 363 self._pack_repository() 364 365 # Finish up by dumping stats & telling the user what to do next. 366 self.dump_stats() 367 if remind_about_update: 368 # This message is explicitly not timestamped. 369 note("To refresh the working tree for other branches, " 370 "use 'bzr update' inside that branch.") 371 372 def _update_working_trees(self, trees): 373 if self.verbose: 374 reporter = delta._ChangeReporter() 375 else: 376 reporter = None 377 for wt in trees: 378 self.note("Updating the working tree for %s ...", wt.basedir) 379 wt.update(reporter) 380 self._tree_count += 1 381 382 def _pack_repository(self, final=True): 383 # Before packing, free whatever memory we can and ensure 384 # that groupcompress is configured to optimise disk space 385 import gc 386 if final: 387 try: 388 from .... import groupcompress 389 except ImportError: 390 pass 391 else: 392 groupcompress._FAST = False 393 gc.collect() 394 self.note("Packing repository ...") 395 self.repo.pack() 396 397 # To be conservative, packing puts the old packs and 398 # indices in obsolete_packs. We err on the side of 399 # optimism and clear out that directory to save space. 400 self.note("Removing obsolete packs ...") 401 # TODO: Use a public API for this once one exists 402 repo_transport = self.repo._pack_collection.transport 403 obsolete_pack_transport = repo_transport.clone('obsolete_packs') 404 for name in obsolete_pack_transport.list_dir('.'): 405 obsolete_pack_transport.delete(name) 406 407 # If we're not done, free whatever memory we can 408 if not final: 409 gc.collect() 410 411 def _get_working_trees(self, branches): 412 """Get the working trees for branches in the repository.""" 413 result = [] 414 wt_expected = self.repo.make_working_trees() 415 for br in branches: 416 if br is None: 417 continue 418 elif br == self.branch: 419 if self.working_tree: 420 result.append(self.working_tree) 421 elif wt_expected: 422 try: 423 result.append(br.controldir.open_workingtree()) 424 except errors.NoWorkingTree: 425 self.warning("No working tree for branch %s", br) 426 return result 427 428 def dump_stats(self): 429 time_required = progress.str_tdelta(time.time() - self._start_time) 430 rc = self._revision_count - self.skip_total 431 bc = self._branch_count 432 wtc = self._tree_count 433 self.note("Imported %d %s, updating %d %s and %d %s in %s", 434 rc, helpers.single_plural(rc, "revision", "revisions"), 435 bc, helpers.single_plural(bc, "branch", "branches"), 436 wtc, helpers.single_plural(wtc, "tree", "trees"), 437 time_required) 438 439 def _init_id_map(self): 440 """Load the id-map and check it matches the repository. 441 442 :return: the number of entries in the map 443 """ 444 # Currently, we just check the size. In the future, we might 445 # decide to be more paranoid and check that the revision-ids 446 # are identical as well. 447 self.cache_mgr.marks, known = idmapfile.load_id_map( 448 self.id_map_path) 449 if self.cache_mgr.add_mark(b'0', _mod_revision.NULL_REVISION): 450 known += 1 451 452 existing_count = len(self.repo.all_revision_ids()) 453 if existing_count < known: 454 raise plugin_errors.BadRepositorySize(known, existing_count) 455 return known 456 457 def _save_id_map(self): 458 """Save the id-map.""" 459 # Save the whole lot every time. If this proves a problem, we can 460 # change to 'append just the new ones' at a later time. 461 idmapfile.save_id_map(self.id_map_path, self.cache_mgr.marks) 462 463 def blob_handler(self, cmd): 464 """Process a BlobCommand.""" 465 if cmd.mark is not None: 466 dataref = cmd.id 467 else: 468 dataref = osutils.sha_strings(cmd.data) 469 self.cache_mgr.store_blob(dataref, cmd.data) 470 471 def checkpoint_handler(self, cmd): 472 """Process a CheckpointCommand.""" 473 # Commit the current write group and start a new one 474 self.repo.commit_write_group() 475 self._save_id_map() 476 # track the number of automatic checkpoints done 477 if cmd is None: 478 self.checkpoint_count += 1 479 if self.checkpoint_count % self.autopack_every == 0: 480 self._pack_repository(final=False) 481 self.repo.start_write_group() 482 483 def commit_handler(self, cmd): 484 """Process a CommitCommand.""" 485 mark = cmd.id.lstrip(b':') 486 if self.skip_total and self._revision_count < self.skip_total: 487 self.cache_mgr.reftracker.track_heads(cmd) 488 # Check that we really do know about this commit-id 489 if mark not in self.cache_mgr.marks: 490 raise plugin_errors.BadRestart(mark) 491 self.cache_mgr._blobs = {} 492 self._revision_count += 1 493 if cmd.ref.startswith(b'refs/tags/'): 494 tag_name = cmd.ref[len(b'refs/tags/'):] 495 self._set_tag(tag_name, cmd.id) 496 return 497 if self.first_incremental_commit: 498 self.first_incremental_commit = None 499 parents = self.cache_mgr.reftracker.track_heads(cmd) 500 501 # 'Commit' the revision and report progress 502 handler = self.commit_handler_factory(cmd, self.cache_mgr, 503 self.rev_store, verbose=self.verbose, 504 prune_empty_dirs=self.prune_empty_dirs) 505 try: 506 handler.process() 507 except: 508 print("ABORT: exception occurred processing commit %s" % (cmd.id)) 509 raise 510 self.cache_mgr.add_mark(mark, handler.revision_id) 511 self._revision_count += 1 512 self.report_progress("(%s)" % cmd.id.lstrip(b':')) 513 514 if cmd.ref.startswith(b'refs/tags/'): 515 tag_name = cmd.ref[len(b'refs/tags/'):] 516 self._set_tag(tag_name, cmd.id) 517 518 # Check if we should finish up or automatically checkpoint 519 if (self.max_commits is not None 520 and self._revision_count >= self.max_commits): 521 self.note("Stopping after reaching requested count of commits") 522 self.finished = True 523 elif self._revision_count % self.checkpoint_every == 0: 524 self.note("%d commits - automatic checkpoint triggered", 525 self._revision_count) 526 self.checkpoint_handler(None) 527 528 def report_progress(self, details=''): 529 if self._revision_count % self.progress_every == 0: 530 if self.total_commits is not None: 531 counts = "%d/%d" % (self._revision_count, self.total_commits) 532 else: 533 counts = "%d" % (self._revision_count,) 534 minutes = (time.time() - self._start_time) / 60 535 revisions_added = self._revision_count - self.skip_total 536 rate = revisions_added * 1.0 / minutes 537 if rate > 10: 538 rate_str = "at %.0f/minute " % rate 539 else: 540 rate_str = "at %.1f/minute " % rate 541 self.note("%s commits processed %s%s" % 542 (counts, rate_str, details)) 543 544 def progress_handler(self, cmd): 545 """Process a ProgressCommand.""" 546 # Most progress messages embedded in streams are annoying. 547 # Ignore them unless in verbose mode. 548 if self.verbose: 549 self.note("progress %s" % (cmd.message,)) 550 551 def reset_handler(self, cmd): 552 """Process a ResetCommand.""" 553 if cmd.ref.startswith(b'refs/tags/'): 554 tag_name = cmd.ref[len(b'refs/tags/'):] 555 if cmd.from_ is not None: 556 self._set_tag(tag_name, cmd.from_) 557 elif self.verbose: 558 self.warning("ignoring reset refs/tags/%s - no from clause" 559 % tag_name) 560 return 561 562 if cmd.from_ is not None: 563 self.cache_mgr.reftracker.track_heads_for_ref(cmd.ref, cmd.from_) 564 565 def tag_handler(self, cmd): 566 """Process a TagCommand.""" 567 if cmd.from_ is not None: 568 self._set_tag(cmd.id, cmd.from_) 569 else: 570 self.warning("ignoring tag %s - no from clause" % cmd.id) 571 572 def _set_tag(self, name, from_): 573 """Define a tag given a name and import 'from' reference.""" 574 bzr_tag_name = name.decode('utf-8', 'replace') 575 bzr_rev_id = self.cache_mgr.lookup_committish(from_) 576 self.tags[bzr_tag_name] = bzr_rev_id 577 578 def feature_handler(self, cmd): 579 """Process a FeatureCommand.""" 580 feature = cmd.feature_name 581 if feature not in commands.FEATURE_NAMES: 582 raise plugin_errors.UnknownFeature(feature) 583 584 def debug(self, msg, *args): 585 """Output a debug message if the appropriate -D option was given.""" 586 if "fast-import" in debug.debug_flags: 587 msg = "%s DEBUG: %s" % (self._time_of_day(), msg) 588 mutter(msg, *args) 589 590 def note(self, msg, *args): 591 """Output a note but timestamp it.""" 592 msg = "%s %s" % (self._time_of_day(), msg) 593 note(msg, *args) 594 595 def warning(self, msg, *args): 596 """Output a warning but timestamp it.""" 597 msg = "%s WARNING: %s" % (self._time_of_day(), msg) 598 warning(msg, *args) 599