1# transaction.py - simple journaling scheme for mercurial
2#
3# This transaction scheme is intended to gracefully handle program
4# errors and interruptions. More serious failures like system crashes
5# can be recovered with an fsck-like tool. As the whole repository is
6# effectively log-structured, this should amount to simply truncating
7# anything that isn't referenced in the changelog.
8#
9# Copyright 2005, 2006 Olivia Mackall <olivia@selenic.com>
10#
11# This software may be used and distributed according to the terms of the
12# GNU General Public License version 2 or any later version.
13
14from __future__ import absolute_import
15
16import errno
17
18from .i18n import _
19from . import (
20    error,
21    pycompat,
22    util,
23)
24from .utils import stringutil
25
26version = 2
27
28# These are the file generators that should only be executed after the
29# finalizers are done, since they rely on the output of the finalizers (like
30# the changelog having been written).
31postfinalizegenerators = {b'bookmarks', b'dirstate'}
32
33GEN_GROUP_ALL = b'all'
34GEN_GROUP_PRE_FINALIZE = b'prefinalize'
35GEN_GROUP_POST_FINALIZE = b'postfinalize'
36
37
38def active(func):
39    def _active(self, *args, **kwds):
40        if self._count == 0:
41            raise error.ProgrammingError(
42                b'cannot use transaction when it is already committed/aborted'
43            )
44        return func(self, *args, **kwds)
45
46    return _active
47
48
49def _playback(
50    journal,
51    report,
52    opener,
53    vfsmap,
54    entries,
55    backupentries,
56    unlink=True,
57    checkambigfiles=None,
58):
59    for f, o in sorted(dict(entries).items()):
60        if o or not unlink:
61            checkambig = checkambigfiles and (f, b'') in checkambigfiles
62            try:
63                fp = opener(f, b'a', checkambig=checkambig)
64                if fp.tell() < o:
65                    raise error.Abort(
66                        _(
67                            b"attempted to truncate %s to %d bytes, but it was "
68                            b"already %d bytes\n"
69                        )
70                        % (f, o, fp.tell())
71                    )
72                fp.truncate(o)
73                fp.close()
74            except IOError:
75                report(_(b"failed to truncate %s\n") % f)
76                raise
77        else:
78            try:
79                opener.unlink(f)
80            except (IOError, OSError) as inst:
81                if inst.errno != errno.ENOENT:
82                    raise
83
84    backupfiles = []
85    for l, f, b, c in backupentries:
86        if l not in vfsmap and c:
87            report(b"couldn't handle %s: unknown cache location %s\n" % (b, l))
88        vfs = vfsmap[l]
89        try:
90            if f and b:
91                filepath = vfs.join(f)
92                backuppath = vfs.join(b)
93                checkambig = checkambigfiles and (f, l) in checkambigfiles
94                try:
95                    util.copyfile(backuppath, filepath, checkambig=checkambig)
96                    backupfiles.append(b)
97                except IOError as exc:
98                    e_msg = stringutil.forcebytestr(exc)
99                    report(_(b"failed to recover %s (%s)\n") % (f, e_msg))
100            else:
101                target = f or b
102                try:
103                    vfs.unlink(target)
104                except (IOError, OSError) as inst:
105                    if inst.errno != errno.ENOENT:
106                        raise
107        except (IOError, OSError, error.Abort):
108            if not c:
109                raise
110
111    backuppath = b"%s.backupfiles" % journal
112    if opener.exists(backuppath):
113        opener.unlink(backuppath)
114    opener.unlink(journal)
115    try:
116        for f in backupfiles:
117            if opener.exists(f):
118                opener.unlink(f)
119    except (IOError, OSError, error.Abort):
120        # only pure backup file remains, it is sage to ignore any error
121        pass
122
123
124class transaction(util.transactional):
125    def __init__(
126        self,
127        report,
128        opener,
129        vfsmap,
130        journalname,
131        undoname=None,
132        after=None,
133        createmode=None,
134        validator=None,
135        releasefn=None,
136        checkambigfiles=None,
137        name='<unnamed>',
138    ):
139        """Begin a new transaction
140
141        Begins a new transaction that allows rolling back writes in the event of
142        an exception.
143
144        * `after`: called after the transaction has been committed
145        * `createmode`: the mode of the journal file that will be created
146        * `releasefn`: called after releasing (with transaction and result)
147
148        `checkambigfiles` is a set of (path, vfs-location) tuples,
149        which determine whether file stat ambiguity should be avoided
150        for corresponded files.
151        """
152        self._count = 1
153        self._usages = 1
154        self._report = report
155        # a vfs to the store content
156        self._opener = opener
157        # a map to access file in various {location -> vfs}
158        vfsmap = vfsmap.copy()
159        vfsmap[b''] = opener  # set default value
160        self._vfsmap = vfsmap
161        self._after = after
162        self._offsetmap = {}
163        self._newfiles = set()
164        self._journal = journalname
165        self._undoname = undoname
166        self._queue = []
167        # A callback to do something just after releasing transaction.
168        if releasefn is None:
169            releasefn = lambda tr, success: None
170        self._releasefn = releasefn
171
172        self._checkambigfiles = set()
173        if checkambigfiles:
174            self._checkambigfiles.update(checkambigfiles)
175
176        self._names = [name]
177
178        # A dict dedicated to precisely tracking the changes introduced in the
179        # transaction.
180        self.changes = {}
181
182        # a dict of arguments to be passed to hooks
183        self.hookargs = {}
184        self._file = opener.open(self._journal, b"w+")
185
186        # a list of ('location', 'path', 'backuppath', cache) entries.
187        # - if 'backuppath' is empty, no file existed at backup time
188        # - if 'path' is empty, this is a temporary transaction file
189        # - if 'location' is not empty, the path is outside main opener reach.
190        #   use 'location' value as a key in a vfsmap to find the right 'vfs'
191        # (cache is currently unused)
192        self._backupentries = []
193        self._backupmap = {}
194        self._backupjournal = b"%s.backupfiles" % self._journal
195        self._backupsfile = opener.open(self._backupjournal, b'w')
196        self._backupsfile.write(b'%d\n' % version)
197
198        if createmode is not None:
199            opener.chmod(self._journal, createmode & 0o666)
200            opener.chmod(self._backupjournal, createmode & 0o666)
201
202        # hold file generations to be performed on commit
203        self._filegenerators = {}
204        # hold callback to write pending data for hooks
205        self._pendingcallback = {}
206        # True is any pending data have been written ever
207        self._anypending = False
208        # holds callback to call when writing the transaction
209        self._finalizecallback = {}
210        # holds callback to call when validating the transaction
211        # should raise exception if anything is wrong
212        self._validatecallback = {}
213        if validator is not None:
214            self._validatecallback[b'001-userhooks'] = validator
215        # hold callback for post transaction close
216        self._postclosecallback = {}
217        # holds callbacks to call during abort
218        self._abortcallback = {}
219
220    def __repr__(self):
221        name = '/'.join(self._names)
222        return '<transaction name=%s, count=%d, usages=%d>' % (
223            name,
224            self._count,
225            self._usages,
226        )
227
228    def __del__(self):
229        if self._journal:
230            self._abort()
231
232    @active
233    def startgroup(self):
234        """delay registration of file entry
235
236        This is used by strip to delay vision of strip offset. The transaction
237        sees either none or all of the strip actions to be done."""
238        self._queue.append([])
239
240    @active
241    def endgroup(self):
242        """apply delayed registration of file entry.
243
244        This is used by strip to delay vision of strip offset. The transaction
245        sees either none or all of the strip actions to be done."""
246        q = self._queue.pop()
247        for f, o in q:
248            self._addentry(f, o)
249
250    @active
251    def add(self, file, offset):
252        """record the state of an append-only file before update"""
253        if (
254            file in self._newfiles
255            or file in self._offsetmap
256            or file in self._backupmap
257        ):
258            return
259        if self._queue:
260            self._queue[-1].append((file, offset))
261            return
262
263        self._addentry(file, offset)
264
265    def _addentry(self, file, offset):
266        """add a append-only entry to memory and on-disk state"""
267        if (
268            file in self._newfiles
269            or file in self._offsetmap
270            or file in self._backupmap
271        ):
272            return
273        if offset:
274            self._offsetmap[file] = offset
275        else:
276            self._newfiles.add(file)
277        # add enough data to the journal to do the truncate
278        self._file.write(b"%s\0%d\n" % (file, offset))
279        self._file.flush()
280
281    @active
282    def addbackup(self, file, hardlink=True, location=b''):
283        """Adds a backup of the file to the transaction
284
285        Calling addbackup() creates a hardlink backup of the specified file
286        that is used to recover the file in the event of the transaction
287        aborting.
288
289        * `file`: the file path, relative to .hg/store
290        * `hardlink`: use a hardlink to quickly create the backup
291        """
292        if self._queue:
293            msg = b'cannot use transaction.addbackup inside "group"'
294            raise error.ProgrammingError(msg)
295
296        if (
297            file in self._newfiles
298            or file in self._offsetmap
299            or file in self._backupmap
300        ):
301            return
302        vfs = self._vfsmap[location]
303        dirname, filename = vfs.split(file)
304        backupfilename = b"%s.backup.%s" % (self._journal, filename)
305        backupfile = vfs.reljoin(dirname, backupfilename)
306        if vfs.exists(file):
307            filepath = vfs.join(file)
308            backuppath = vfs.join(backupfile)
309            util.copyfile(filepath, backuppath, hardlink=hardlink)
310        else:
311            backupfile = b''
312
313        self._addbackupentry((location, file, backupfile, False))
314
315    def _addbackupentry(self, entry):
316        """register a new backup entry and write it to disk"""
317        self._backupentries.append(entry)
318        self._backupmap[entry[1]] = len(self._backupentries) - 1
319        self._backupsfile.write(b"%s\0%s\0%s\0%d\n" % entry)
320        self._backupsfile.flush()
321
322    @active
323    def registertmp(self, tmpfile, location=b''):
324        """register a temporary transaction file
325
326        Such files will be deleted when the transaction exits (on both
327        failure and success).
328        """
329        self._addbackupentry((location, b'', tmpfile, False))
330
331    @active
332    def addfilegenerator(
333        self, genid, filenames, genfunc, order=0, location=b''
334    ):
335        """add a function to generates some files at transaction commit
336
337        The `genfunc` argument is a function capable of generating proper
338        content of each entry in the `filename` tuple.
339
340        At transaction close time, `genfunc` will be called with one file
341        object argument per entries in `filenames`.
342
343        The transaction itself is responsible for the backup, creation and
344        final write of such file.
345
346        The `genid` argument is used to ensure the same set of file is only
347        generated once. Call to `addfilegenerator` for a `genid` already
348        present will overwrite the old entry.
349
350        The `order` argument may be used to control the order in which multiple
351        generator will be executed.
352
353        The `location` arguments may be used to indicate the files are located
354        outside of the the standard directory for transaction. It should match
355        one of the key of the `transaction.vfsmap` dictionary.
356        """
357        # For now, we are unable to do proper backup and restore of custom vfs
358        # but for bookmarks that are handled outside this mechanism.
359        self._filegenerators[genid] = (order, filenames, genfunc, location)
360
361    @active
362    def removefilegenerator(self, genid):
363        """reverse of addfilegenerator, remove a file generator function"""
364        if genid in self._filegenerators:
365            del self._filegenerators[genid]
366
367    def _generatefiles(self, suffix=b'', group=GEN_GROUP_ALL):
368        # write files registered for generation
369        any = False
370
371        if group == GEN_GROUP_ALL:
372            skip_post = skip_pre = False
373        else:
374            skip_pre = group == GEN_GROUP_POST_FINALIZE
375            skip_post = group == GEN_GROUP_PRE_FINALIZE
376
377        for id, entry in sorted(pycompat.iteritems(self._filegenerators)):
378            any = True
379            order, filenames, genfunc, location = entry
380
381            # for generation at closing, check if it's before or after finalize
382            is_post = id in postfinalizegenerators
383            if skip_post and is_post:
384                continue
385            elif skip_pre and not is_post:
386                continue
387
388            vfs = self._vfsmap[location]
389            files = []
390            try:
391                for name in filenames:
392                    name += suffix
393                    if suffix:
394                        self.registertmp(name, location=location)
395                        checkambig = False
396                    else:
397                        self.addbackup(name, location=location)
398                        checkambig = (name, location) in self._checkambigfiles
399                    files.append(
400                        vfs(name, b'w', atomictemp=True, checkambig=checkambig)
401                    )
402                genfunc(*files)
403                for f in files:
404                    f.close()
405                # skip discard() loop since we're sure no open file remains
406                del files[:]
407            finally:
408                for f in files:
409                    f.discard()
410        return any
411
412    @active
413    def findoffset(self, file):
414        if file in self._newfiles:
415            return 0
416        return self._offsetmap.get(file)
417
418    @active
419    def readjournal(self):
420        self._file.seek(0)
421        entries = []
422        for l in self._file.readlines():
423            file, troffset = l.split(b'\0')
424            entries.append((file, int(troffset)))
425        return entries
426
427    @active
428    def replace(self, file, offset):
429        """
430        replace can only replace already committed entries
431        that are not pending in the queue
432        """
433        if file in self._newfiles:
434            if not offset:
435                return
436            self._newfiles.remove(file)
437            self._offsetmap[file] = offset
438        elif file in self._offsetmap:
439            if not offset:
440                del self._offsetmap[file]
441                self._newfiles.add(file)
442            else:
443                self._offsetmap[file] = offset
444        else:
445            raise KeyError(file)
446        self._file.write(b"%s\0%d\n" % (file, offset))
447        self._file.flush()
448
449    @active
450    def nest(self, name='<unnamed>'):
451        self._count += 1
452        self._usages += 1
453        self._names.append(name)
454        return self
455
456    def release(self):
457        if self._count > 0:
458            self._usages -= 1
459        if self._names:
460            self._names.pop()
461        # if the transaction scopes are left without being closed, fail
462        if self._count > 0 and self._usages == 0:
463            self._abort()
464
465    def running(self):
466        return self._count > 0
467
468    def addpending(self, category, callback):
469        """add a callback to be called when the transaction is pending
470
471        The transaction will be given as callback's first argument.
472
473        Category is a unique identifier to allow overwriting an old callback
474        with a newer callback.
475        """
476        self._pendingcallback[category] = callback
477
478    @active
479    def writepending(self):
480        """write pending file to temporary version
481
482        This is used to allow hooks to view a transaction before commit"""
483        categories = sorted(self._pendingcallback)
484        for cat in categories:
485            # remove callback since the data will have been flushed
486            any = self._pendingcallback.pop(cat)(self)
487            self._anypending = self._anypending or any
488        self._anypending |= self._generatefiles(suffix=b'.pending')
489        return self._anypending
490
491    @active
492    def hasfinalize(self, category):
493        """check is a callback already exist for a category"""
494        return category in self._finalizecallback
495
496    @active
497    def addfinalize(self, category, callback):
498        """add a callback to be called when the transaction is closed
499
500        The transaction will be given as callback's first argument.
501
502        Category is a unique identifier to allow overwriting old callbacks with
503        newer callbacks.
504        """
505        self._finalizecallback[category] = callback
506
507    @active
508    def addpostclose(self, category, callback):
509        """add or replace a callback to be called after the transaction closed
510
511        The transaction will be given as callback's first argument.
512
513        Category is a unique identifier to allow overwriting an old callback
514        with a newer callback.
515        """
516        self._postclosecallback[category] = callback
517
518    @active
519    def getpostclose(self, category):
520        """return a postclose callback added before, or None"""
521        return self._postclosecallback.get(category, None)
522
523    @active
524    def addabort(self, category, callback):
525        """add a callback to be called when the transaction is aborted.
526
527        The transaction will be given as the first argument to the callback.
528
529        Category is a unique identifier to allow overwriting an old callback
530        with a newer callback.
531        """
532        self._abortcallback[category] = callback
533
534    @active
535    def addvalidator(self, category, callback):
536        """adds a callback to be called when validating the transaction.
537
538        The transaction will be given as the first argument to the callback.
539
540        callback should raise exception if to abort transaction"""
541        self._validatecallback[category] = callback
542
543    @active
544    def close(self):
545        '''commit the transaction'''
546        if self._count == 1:
547            for category in sorted(self._validatecallback):
548                self._validatecallback[category](self)
549            self._validatecallback = None  # Help prevent cycles.
550            self._generatefiles(group=GEN_GROUP_PRE_FINALIZE)
551            while self._finalizecallback:
552                callbacks = self._finalizecallback
553                self._finalizecallback = {}
554                categories = sorted(callbacks)
555                for cat in categories:
556                    callbacks[cat](self)
557            # Prevent double usage and help clear cycles.
558            self._finalizecallback = None
559            self._generatefiles(group=GEN_GROUP_POST_FINALIZE)
560
561        self._count -= 1
562        if self._count != 0:
563            return
564        self._file.close()
565        self._backupsfile.close()
566        # cleanup temporary files
567        for l, f, b, c in self._backupentries:
568            if l not in self._vfsmap and c:
569                self._report(
570                    b"couldn't remove %s: unknown cache location %s\n" % (b, l)
571                )
572                continue
573            vfs = self._vfsmap[l]
574            if not f and b and vfs.exists(b):
575                try:
576                    vfs.unlink(b)
577                except (IOError, OSError, error.Abort) as inst:
578                    if not c:
579                        raise
580                    # Abort may be raise by read only opener
581                    self._report(
582                        b"couldn't remove %s: %s\n" % (vfs.join(b), inst)
583                    )
584        self._offsetmap = {}
585        self._newfiles = set()
586        self._writeundo()
587        if self._after:
588            self._after()
589            self._after = None  # Help prevent cycles.
590        if self._opener.isfile(self._backupjournal):
591            self._opener.unlink(self._backupjournal)
592        if self._opener.isfile(self._journal):
593            self._opener.unlink(self._journal)
594        for l, _f, b, c in self._backupentries:
595            if l not in self._vfsmap and c:
596                self._report(
597                    b"couldn't remove %s: unknown cache location"
598                    b"%s\n" % (b, l)
599                )
600                continue
601            vfs = self._vfsmap[l]
602            if b and vfs.exists(b):
603                try:
604                    vfs.unlink(b)
605                except (IOError, OSError, error.Abort) as inst:
606                    if not c:
607                        raise
608                    # Abort may be raise by read only opener
609                    self._report(
610                        b"couldn't remove %s: %s\n" % (vfs.join(b), inst)
611                    )
612        self._backupentries = []
613        self._journal = None
614
615        self._releasefn(self, True)  # notify success of closing transaction
616        self._releasefn = None  # Help prevent cycles.
617
618        # run post close action
619        categories = sorted(self._postclosecallback)
620        for cat in categories:
621            self._postclosecallback[cat](self)
622        # Prevent double usage and help clear cycles.
623        self._postclosecallback = None
624
625    @active
626    def abort(self):
627        """abort the transaction (generally called on error, or when the
628        transaction is not explicitly committed before going out of
629        scope)"""
630        self._abort()
631
632    def _writeundo(self):
633        """write transaction data for possible future undo call"""
634        if self._undoname is None:
635            return
636
637        undo_backup_path = b"%s.backupfiles" % self._undoname
638        undobackupfile = self._opener.open(undo_backup_path, b'w')
639        undobackupfile.write(b'%d\n' % version)
640        for l, f, b, c in self._backupentries:
641            if not f:  # temporary file
642                continue
643            if not b:
644                u = b''
645            else:
646                if l not in self._vfsmap and c:
647                    self._report(
648                        b"couldn't remove %s: unknown cache location"
649                        b"%s\n" % (b, l)
650                    )
651                    continue
652                vfs = self._vfsmap[l]
653                base, name = vfs.split(b)
654                assert name.startswith(self._journal), name
655                uname = name.replace(self._journal, self._undoname, 1)
656                u = vfs.reljoin(base, uname)
657                util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
658            undobackupfile.write(b"%s\0%s\0%s\0%d\n" % (l, f, u, c))
659        undobackupfile.close()
660
661    def _abort(self):
662        entries = self.readjournal()
663        self._count = 0
664        self._usages = 0
665        self._file.close()
666        self._backupsfile.close()
667
668        try:
669            if not entries and not self._backupentries:
670                if self._backupjournal:
671                    self._opener.unlink(self._backupjournal)
672                if self._journal:
673                    self._opener.unlink(self._journal)
674                return
675
676            self._report(_(b"transaction abort!\n"))
677
678            try:
679                for cat in sorted(self._abortcallback):
680                    self._abortcallback[cat](self)
681                # Prevent double usage and help clear cycles.
682                self._abortcallback = None
683                _playback(
684                    self._journal,
685                    self._report,
686                    self._opener,
687                    self._vfsmap,
688                    entries,
689                    self._backupentries,
690                    False,
691                    checkambigfiles=self._checkambigfiles,
692                )
693                self._report(_(b"rollback completed\n"))
694            except BaseException as exc:
695                self._report(_(b"rollback failed - please run hg recover\n"))
696                self._report(
697                    _(b"(failure reason: %s)\n") % stringutil.forcebytestr(exc)
698                )
699        finally:
700            self._journal = None
701            self._releasefn(self, False)  # notify failure of transaction
702            self._releasefn = None  # Help prevent cycles.
703
704
705BAD_VERSION_MSG = _(
706    b"journal was created by a different version of Mercurial\n"
707)
708
709
710def rollback(opener, vfsmap, file, report, checkambigfiles=None):
711    """Rolls back the transaction contained in the given file
712
713    Reads the entries in the specified file, and the corresponding
714    '*.backupfiles' file, to recover from an incomplete transaction.
715
716    * `file`: a file containing a list of entries, specifying where
717    to truncate each file.  The file should contain a list of
718    file\0offset pairs, delimited by newlines. The corresponding
719    '*.backupfiles' file should contain a list of file\0backupfile
720    pairs, delimited by \0.
721
722    `checkambigfiles` is a set of (path, vfs-location) tuples,
723    which determine whether file stat ambiguity should be avoided at
724    restoring corresponded files.
725    """
726    entries = []
727    backupentries = []
728
729    with opener.open(file) as fp:
730        lines = fp.readlines()
731    for l in lines:
732        try:
733            f, o = l.split(b'\0')
734            entries.append((f, int(o)))
735        except ValueError:
736            report(
737                _(b"couldn't read journal entry %r!\n") % pycompat.bytestr(l)
738            )
739
740    backupjournal = b"%s.backupfiles" % file
741    if opener.exists(backupjournal):
742        fp = opener.open(backupjournal)
743        lines = fp.readlines()
744        if lines:
745            ver = lines[0][:-1]
746            if ver != (b'%d' % version):
747                report(BAD_VERSION_MSG)
748            else:
749                for line in lines[1:]:
750                    if line:
751                        # Shave off the trailing newline
752                        line = line[:-1]
753                        l, f, b, c = line.split(b'\0')
754                        backupentries.append((l, f, b, bool(c)))
755
756    _playback(
757        file,
758        report,
759        opener,
760        vfsmap,
761        entries,
762        backupentries,
763        checkambigfiles=checkambigfiles,
764    )
765