1# Copyright (C) 2008 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
16"""Import processor that supports all Bazaar repository formats."""
17
18
19import time
20from .... import (
21    debug,
22    delta,
23    errors,
24    osutils,
25    progress,
26    revision as _mod_revision,
27    )
28from ....bzr.knitpack_repo import KnitPackRepository
29from ....trace import (
30    mutter,
31    note,
32    warning,
33    )
34import configobj
35from .. import (
36    branch_updater,
37    cache_manager,
38    helpers,
39    idmapfile,
40    marks_file,
41    revision_store,
42    )
43from fastimport import (
44    commands,
45    errors as plugin_errors,
46    processor,
47    )
48from fastimport.helpers import (
49    invert_dictset,
50    )
51
52
53# How many commits before automatically reporting progress
54_DEFAULT_AUTO_PROGRESS = 1000
55
56# How many commits before automatically checkpointing
57_DEFAULT_AUTO_CHECKPOINT = 10000
58
59# How many checkpoints before automatically packing
60_DEFAULT_AUTO_PACK = 4
61
62# How many inventories to cache
63_DEFAULT_INV_CACHE_SIZE = 1
64_DEFAULT_CHK_INV_CACHE_SIZE = 1
65
66
67class GenericProcessor(processor.ImportProcessor):
68    """An import processor that handles basic imports.
69
70    Current features supported:
71
72    * blobs are cached in memory
73    * files and symlinks commits are supported
74    * checkpoints automatically happen at a configurable frequency
75      over and above the stream requested checkpoints
76    * timestamped progress reporting, both automatic and stream requested
77    * some basic statistics are dumped on completion.
78
79    At checkpoints and on completion, the commit-id -> revision-id map is
80    saved to a file called 'fastimport-id-map'. If the import crashes
81    or is interrupted, it can be started again and this file will be
82    used to skip over already loaded revisions. The format of each line
83    is "commit-id revision-id" so commit-ids cannot include spaces.
84
85    Here are the supported parameters:
86
87    * info - name of a hints file holding the analysis generated
88      by running the fast-import-info processor in verbose mode. When
89      importing large repositories, this parameter is needed so
90      that the importer knows what blobs to intelligently cache.
91
92    * trees - update the working trees before completing.
93      By default, the importer updates the repository
94      and branches and the user needs to run 'bzr update' for the
95      branches of interest afterwards.
96
97    * count - only import this many commits then exit. If not set
98      or negative, all commits are imported.
99
100    * checkpoint - automatically checkpoint every n commits over and
101      above any checkpoints contained in the import stream.
102      The default is 10000.
103
104    * autopack - pack every n checkpoints. The default is 4.
105
106    * inv-cache - number of inventories to cache.
107      If not set, the default is 1.
108
109    * mode - import algorithm to use: default or experimental.
110
111    * import-marks - name of file to read to load mark information from
112
113    * export-marks - name of file to write to save mark information to
114    """
115
116    known_params = [
117        'info',
118        'trees',
119        'count',
120        'checkpoint',
121        'autopack',
122        'inv-cache',
123        'mode',
124        'import-marks',
125        'export-marks',
126        ]
127
128    def __init__(self, bzrdir, params=None, verbose=False, outf=None,
129                 prune_empty_dirs=True):
130        processor.ImportProcessor.__init__(self, params, verbose)
131        self.prune_empty_dirs = prune_empty_dirs
132        self.controldir = bzrdir
133        try:
134            # Might be inside a branch
135            (self.working_tree, self.branch) = bzrdir._get_tree_branch()
136            self.repo = self.branch.repository
137        except errors.NotBranchError:
138            # Must be inside a repository
139            self.working_tree = None
140            self.branch = None
141            self.repo = bzrdir.open_repository()
142
143    def pre_process(self):
144        self._start_time = time.time()
145        self._load_info_and_params()
146        if self.total_commits:
147            self.note("Starting import of %d commits ..." %
148                      (self.total_commits,))
149        else:
150            self.note("Starting import ...")
151        self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose,
152                                                    self.inventory_cache_size)
153
154        if self.params.get("import-marks") is not None:
155            mark_info = marks_file.import_marks(
156                self.params.get("import-marks"))
157            if mark_info is not None:
158                self.cache_mgr.marks = mark_info
159            self.skip_total = False
160            self.first_incremental_commit = True
161        else:
162            self.first_incremental_commit = False
163            self.skip_total = self._init_id_map()
164            if self.skip_total:
165                self.note("Found %d commits already loaded - "
166                          "skipping over these ...", self.skip_total)
167        self._revision_count = 0
168
169        # mapping of tag name to revision_id
170        self.tags = {}
171
172        # Create the revision store to use for committing, if any
173        self.rev_store = self._revision_store_factory()
174
175        # Disable autopacking if the repo format supports it.
176        # THIS IS A HACK - there is no sanctioned way of doing this yet.
177        if isinstance(self.repo, KnitPackRepository):
178            self._original_max_pack_count = \
179                self.repo._pack_collection._max_pack_count
180
181            def _max_pack_count_for_import(total_revisions):
182                return total_revisions + 1
183            self.repo._pack_collection._max_pack_count = \
184                _max_pack_count_for_import
185        else:
186            self._original_max_pack_count = None
187
188        # Make groupcompress use the fast algorithm during importing.
189        # We want to repack at the end anyhow when more information
190        # is available to do a better job of saving space.
191        try:
192            from .... import groupcompress
193            groupcompress._FAST = True
194        except ImportError:
195            pass
196
197        # Create a write group. This is committed at the end of the import.
198        # Checkpointing closes the current one and starts a new one.
199        self.repo.start_write_group()
200
201    def _load_info_and_params(self):
202        from .. import bzr_commit_handler
203        # This is currently hard-coded but might be configurable via
204        # parameters one day if that's needed
205        repo_transport = self.repo.control_transport
206        self.id_map_path = repo_transport.local_abspath("fastimport-id-map")
207
208        # Load the info file, if any
209        info_path = self.params.get('info')
210        if info_path is not None:
211            self.info = configobj.ConfigObj(info_path)
212        else:
213            self.info = None
214
215        self.supports_chk = self.repo._format.supports_chks
216        self.commit_handler_factory = bzr_commit_handler.CommitHandler
217
218        # Decide how often to automatically report progress
219        # (not a parameter yet)
220        self.progress_every = _DEFAULT_AUTO_PROGRESS
221        if self.verbose:
222            self.progress_every = self.progress_every / 10
223
224        # Decide how often (# of commits) to automatically checkpoint
225        self.checkpoint_every = int(self.params.get('checkpoint',
226                                                    _DEFAULT_AUTO_CHECKPOINT))
227
228        # Decide how often (# of checkpoints) to automatically pack
229        self.checkpoint_count = 0
230        self.autopack_every = int(self.params.get('autopack',
231                                                  _DEFAULT_AUTO_PACK))
232
233        # Decide how big to make the inventory cache
234        cache_size = int(self.params.get('inv-cache', -1))
235        if cache_size == -1:
236            if self.supports_chk:
237                cache_size = _DEFAULT_CHK_INV_CACHE_SIZE
238            else:
239                cache_size = _DEFAULT_INV_CACHE_SIZE
240        self.inventory_cache_size = cache_size
241
242        # Find the maximum number of commits to import (None means all)
243        # and prepare progress reporting. Just in case the info file
244        # has an outdated count of commits, we store the max counts
245        # at which we need to terminate separately to the total used
246        # for progress tracking.
247        try:
248            self.max_commits = int(self.params['count'])
249            if self.max_commits < 0:
250                self.max_commits = None
251        except KeyError:
252            self.max_commits = None
253        if self.info is not None:
254            self.total_commits = int(self.info['Command counts']['commit'])
255            if (self.max_commits is not None
256                    and self.total_commits > self.max_commits):
257                self.total_commits = self.max_commits
258        else:
259            self.total_commits = self.max_commits
260
261    def _revision_store_factory(self):
262        """Make a RevisionStore based on what the repository supports."""
263        return revision_store.RevisionStore(self.repo)
264
265    def process(self, command_iter):
266        """Import data into Bazaar by processing a stream of commands.
267
268        :param command_iter: an iterator providing commands
269        """
270        if self.working_tree is not None:
271            self.working_tree.lock_write()
272        elif self.branch is not None:
273            self.branch.lock_write()
274        elif self.repo is not None:
275            self.repo.lock_write()
276        try:
277            super(GenericProcessor, self)._process(command_iter)
278        finally:
279            # If an unhandled exception occurred, abort the write group
280            if self.repo is not None and self.repo.is_in_write_group():
281                self.repo.abort_write_group()
282            # Release the locks
283            if self.working_tree is not None:
284                self.working_tree.unlock()
285            elif self.branch is not None:
286                self.branch.unlock()
287            elif self.repo is not None:
288                self.repo.unlock()
289
290    def _process(self, command_iter):
291        # if anything goes wrong, abort the write group if any
292        try:
293            processor.ImportProcessor._process(self, command_iter)
294        except:
295            if self.repo is not None and self.repo.is_in_write_group():
296                self.repo.abort_write_group()
297            raise
298
299    def post_process(self):
300        # Commit the current write group and checkpoint the id map
301        self.repo.commit_write_group()
302        self._save_id_map()
303
304        if self.params.get("export-marks") is not None:
305            marks_file.export_marks(self.params.get("export-marks"),
306                                    self.cache_mgr.marks)
307
308        if self.cache_mgr.reftracker.last_ref is None:
309            """Nothing to refresh"""
310            return
311
312        # Update the branches
313        self.note("Updating branch information ...")
314        updater = branch_updater.BranchUpdater(self.repo, self.branch,
315                                               self.cache_mgr, invert_dictset(
316                                                   self.cache_mgr.reftracker.heads),
317                                               self.cache_mgr.reftracker.last_ref, self.tags)
318        branches_updated, branches_lost = updater.update()
319        self._branch_count = len(branches_updated)
320
321        # Tell the user about branches that were not created
322        if branches_lost:
323            if not self.repo.is_shared():
324                self.warning("Cannot import multiple branches into "
325                             "a standalone branch")
326            self.warning("Not creating branches for these head revisions:")
327            for lost_info in branches_lost:
328                head_revision = lost_info[1]
329                branch_name = lost_info[0]
330                self.note("\t %s = %s", head_revision, branch_name)
331
332        # Update the working trees as requested
333        self._tree_count = 0
334        remind_about_update = True
335        if self._branch_count == 0:
336            self.note("no branches to update")
337            self.note("no working trees to update")
338            remind_about_update = False
339        elif self.params.get('trees', False):
340            trees = self._get_working_trees(branches_updated)
341            if trees:
342                self._update_working_trees(trees)
343                remind_about_update = False
344            else:
345                self.warning("No working trees available to update")
346        else:
347            # Update just the trunk. (This is always the first branch
348            # returned by the branch updater.)
349            trunk_branch = branches_updated[0]
350            trees = self._get_working_trees([trunk_branch])
351            if trees:
352                self._update_working_trees(trees)
353                remind_about_update = self._branch_count > 1
354
355        # Dump the cache stats now because we clear it before the final pack
356        if self.verbose:
357            self.cache_mgr.dump_stats()
358        if self._original_max_pack_count:
359            # We earlier disabled autopacking, creating one pack every
360            # checkpoint instead. We now pack the repository to optimise
361            # how data is stored.
362            self.cache_mgr.clear_all()
363            self._pack_repository()
364
365        # Finish up by dumping stats & telling the user what to do next.
366        self.dump_stats()
367        if remind_about_update:
368            # This message is explicitly not timestamped.
369            note("To refresh the working tree for other branches, "
370                 "use 'bzr update' inside that branch.")
371
372    def _update_working_trees(self, trees):
373        if self.verbose:
374            reporter = delta._ChangeReporter()
375        else:
376            reporter = None
377        for wt in trees:
378            self.note("Updating the working tree for %s ...", wt.basedir)
379            wt.update(reporter)
380            self._tree_count += 1
381
382    def _pack_repository(self, final=True):
383        # Before packing, free whatever memory we can and ensure
384        # that groupcompress is configured to optimise disk space
385        import gc
386        if final:
387            try:
388                from .... import groupcompress
389            except ImportError:
390                pass
391            else:
392                groupcompress._FAST = False
393        gc.collect()
394        self.note("Packing repository ...")
395        self.repo.pack()
396
397        # To be conservative, packing puts the old packs and
398        # indices in obsolete_packs. We err on the side of
399        # optimism and clear out that directory to save space.
400        self.note("Removing obsolete packs ...")
401        # TODO: Use a public API for this once one exists
402        repo_transport = self.repo._pack_collection.transport
403        obsolete_pack_transport = repo_transport.clone('obsolete_packs')
404        for name in obsolete_pack_transport.list_dir('.'):
405            obsolete_pack_transport.delete(name)
406
407        # If we're not done, free whatever memory we can
408        if not final:
409            gc.collect()
410
411    def _get_working_trees(self, branches):
412        """Get the working trees for branches in the repository."""
413        result = []
414        wt_expected = self.repo.make_working_trees()
415        for br in branches:
416            if br is None:
417                continue
418            elif br == self.branch:
419                if self.working_tree:
420                    result.append(self.working_tree)
421            elif wt_expected:
422                try:
423                    result.append(br.controldir.open_workingtree())
424                except errors.NoWorkingTree:
425                    self.warning("No working tree for branch %s", br)
426        return result
427
428    def dump_stats(self):
429        time_required = progress.str_tdelta(time.time() - self._start_time)
430        rc = self._revision_count - self.skip_total
431        bc = self._branch_count
432        wtc = self._tree_count
433        self.note("Imported %d %s, updating %d %s and %d %s in %s",
434                  rc, helpers.single_plural(rc, "revision", "revisions"),
435                  bc, helpers.single_plural(bc, "branch", "branches"),
436                  wtc, helpers.single_plural(wtc, "tree", "trees"),
437                  time_required)
438
439    def _init_id_map(self):
440        """Load the id-map and check it matches the repository.
441
442        :return: the number of entries in the map
443        """
444        # Currently, we just check the size. In the future, we might
445        # decide to be more paranoid and check that the revision-ids
446        # are identical as well.
447        self.cache_mgr.marks, known = idmapfile.load_id_map(
448            self.id_map_path)
449        if self.cache_mgr.add_mark(b'0', _mod_revision.NULL_REVISION):
450            known += 1
451
452        existing_count = len(self.repo.all_revision_ids())
453        if existing_count < known:
454            raise plugin_errors.BadRepositorySize(known, existing_count)
455        return known
456
457    def _save_id_map(self):
458        """Save the id-map."""
459        # Save the whole lot every time. If this proves a problem, we can
460        # change to 'append just the new ones' at a later time.
461        idmapfile.save_id_map(self.id_map_path, self.cache_mgr.marks)
462
463    def blob_handler(self, cmd):
464        """Process a BlobCommand."""
465        if cmd.mark is not None:
466            dataref = cmd.id
467        else:
468            dataref = osutils.sha_strings(cmd.data)
469        self.cache_mgr.store_blob(dataref, cmd.data)
470
471    def checkpoint_handler(self, cmd):
472        """Process a CheckpointCommand."""
473        # Commit the current write group and start a new one
474        self.repo.commit_write_group()
475        self._save_id_map()
476        # track the number of automatic checkpoints done
477        if cmd is None:
478            self.checkpoint_count += 1
479            if self.checkpoint_count % self.autopack_every == 0:
480                self._pack_repository(final=False)
481        self.repo.start_write_group()
482
483    def commit_handler(self, cmd):
484        """Process a CommitCommand."""
485        mark = cmd.id.lstrip(b':')
486        if self.skip_total and self._revision_count < self.skip_total:
487            self.cache_mgr.reftracker.track_heads(cmd)
488            # Check that we really do know about this commit-id
489            if mark not in self.cache_mgr.marks:
490                raise plugin_errors.BadRestart(mark)
491            self.cache_mgr._blobs = {}
492            self._revision_count += 1
493            if cmd.ref.startswith(b'refs/tags/'):
494                tag_name = cmd.ref[len(b'refs/tags/'):]
495                self._set_tag(tag_name, cmd.id)
496            return
497        if self.first_incremental_commit:
498            self.first_incremental_commit = None
499            parents = self.cache_mgr.reftracker.track_heads(cmd)
500
501        # 'Commit' the revision and report progress
502        handler = self.commit_handler_factory(cmd, self.cache_mgr,
503                                              self.rev_store, verbose=self.verbose,
504                                              prune_empty_dirs=self.prune_empty_dirs)
505        try:
506            handler.process()
507        except:
508            print("ABORT: exception occurred processing commit %s" % (cmd.id))
509            raise
510        self.cache_mgr.add_mark(mark, handler.revision_id)
511        self._revision_count += 1
512        self.report_progress("(%s)" % cmd.id.lstrip(b':'))
513
514        if cmd.ref.startswith(b'refs/tags/'):
515            tag_name = cmd.ref[len(b'refs/tags/'):]
516            self._set_tag(tag_name, cmd.id)
517
518        # Check if we should finish up or automatically checkpoint
519        if (self.max_commits is not None
520                and self._revision_count >= self.max_commits):
521            self.note("Stopping after reaching requested count of commits")
522            self.finished = True
523        elif self._revision_count % self.checkpoint_every == 0:
524            self.note("%d commits - automatic checkpoint triggered",
525                      self._revision_count)
526            self.checkpoint_handler(None)
527
528    def report_progress(self, details=''):
529        if self._revision_count % self.progress_every == 0:
530            if self.total_commits is not None:
531                counts = "%d/%d" % (self._revision_count, self.total_commits)
532            else:
533                counts = "%d" % (self._revision_count,)
534            minutes = (time.time() - self._start_time) / 60
535            revisions_added = self._revision_count - self.skip_total
536            rate = revisions_added * 1.0 / minutes
537            if rate > 10:
538                rate_str = "at %.0f/minute " % rate
539            else:
540                rate_str = "at %.1f/minute " % rate
541            self.note("%s commits processed %s%s" %
542                      (counts, rate_str, details))
543
544    def progress_handler(self, cmd):
545        """Process a ProgressCommand."""
546        # Most progress messages embedded in streams are annoying.
547        # Ignore them unless in verbose mode.
548        if self.verbose:
549            self.note("progress %s" % (cmd.message,))
550
551    def reset_handler(self, cmd):
552        """Process a ResetCommand."""
553        if cmd.ref.startswith(b'refs/tags/'):
554            tag_name = cmd.ref[len(b'refs/tags/'):]
555            if cmd.from_ is not None:
556                self._set_tag(tag_name, cmd.from_)
557            elif self.verbose:
558                self.warning("ignoring reset refs/tags/%s - no from clause"
559                             % tag_name)
560            return
561
562        if cmd.from_ is not None:
563            self.cache_mgr.reftracker.track_heads_for_ref(cmd.ref, cmd.from_)
564
565    def tag_handler(self, cmd):
566        """Process a TagCommand."""
567        if cmd.from_ is not None:
568            self._set_tag(cmd.id, cmd.from_)
569        else:
570            self.warning("ignoring tag %s - no from clause" % cmd.id)
571
572    def _set_tag(self, name, from_):
573        """Define a tag given a name and import 'from' reference."""
574        bzr_tag_name = name.decode('utf-8', 'replace')
575        bzr_rev_id = self.cache_mgr.lookup_committish(from_)
576        self.tags[bzr_tag_name] = bzr_rev_id
577
578    def feature_handler(self, cmd):
579        """Process a FeatureCommand."""
580        feature = cmd.feature_name
581        if feature not in commands.FEATURE_NAMES:
582            raise plugin_errors.UnknownFeature(feature)
583
584    def debug(self, msg, *args):
585        """Output a debug message if the appropriate -D option was given."""
586        if "fast-import" in debug.debug_flags:
587            msg = "%s DEBUG: %s" % (self._time_of_day(), msg)
588            mutter(msg, *args)
589
590    def note(self, msg, *args):
591        """Output a note but timestamp it."""
592        msg = "%s %s" % (self._time_of_day(), msg)
593        note(msg, *args)
594
595    def warning(self, msg, *args):
596        """Output a warning but timestamp it."""
597        msg = "%s WARNING: %s" % (self._time_of_day(), msg)
598        warning(msg, *args)
599