1# transaction.py - simple journaling scheme for mercurial 2# 3# This transaction scheme is intended to gracefully handle program 4# errors and interruptions. More serious failures like system crashes 5# can be recovered with an fsck-like tool. As the whole repository is 6# effectively log-structured, this should amount to simply truncating 7# anything that isn't referenced in the changelog. 8# 9# Copyright 2005, 2006 Olivia Mackall <olivia@selenic.com> 10# 11# This software may be used and distributed according to the terms of the 12# GNU General Public License version 2 or any later version. 13 14from __future__ import absolute_import 15 16import errno 17 18from .i18n import _ 19from . import ( 20 error, 21 pycompat, 22 util, 23) 24from .utils import stringutil 25 26version = 2 27 28# These are the file generators that should only be executed after the 29# finalizers are done, since they rely on the output of the finalizers (like 30# the changelog having been written). 31postfinalizegenerators = {b'bookmarks', b'dirstate'} 32 33GEN_GROUP_ALL = b'all' 34GEN_GROUP_PRE_FINALIZE = b'prefinalize' 35GEN_GROUP_POST_FINALIZE = b'postfinalize' 36 37 38def active(func): 39 def _active(self, *args, **kwds): 40 if self._count == 0: 41 raise error.ProgrammingError( 42 b'cannot use transaction when it is already committed/aborted' 43 ) 44 return func(self, *args, **kwds) 45 46 return _active 47 48 49def _playback( 50 journal, 51 report, 52 opener, 53 vfsmap, 54 entries, 55 backupentries, 56 unlink=True, 57 checkambigfiles=None, 58): 59 for f, o in sorted(dict(entries).items()): 60 if o or not unlink: 61 checkambig = checkambigfiles and (f, b'') in checkambigfiles 62 try: 63 fp = opener(f, b'a', checkambig=checkambig) 64 if fp.tell() < o: 65 raise error.Abort( 66 _( 67 b"attempted to truncate %s to %d bytes, but it was " 68 b"already %d bytes\n" 69 ) 70 % (f, o, fp.tell()) 71 ) 72 fp.truncate(o) 73 fp.close() 74 except IOError: 75 report(_(b"failed to truncate %s\n") % f) 76 raise 77 else: 78 try: 79 opener.unlink(f) 80 except (IOError, OSError) as inst: 81 if inst.errno != errno.ENOENT: 82 raise 83 84 backupfiles = [] 85 for l, f, b, c in backupentries: 86 if l not in vfsmap and c: 87 report(b"couldn't handle %s: unknown cache location %s\n" % (b, l)) 88 vfs = vfsmap[l] 89 try: 90 if f and b: 91 filepath = vfs.join(f) 92 backuppath = vfs.join(b) 93 checkambig = checkambigfiles and (f, l) in checkambigfiles 94 try: 95 util.copyfile(backuppath, filepath, checkambig=checkambig) 96 backupfiles.append(b) 97 except IOError as exc: 98 e_msg = stringutil.forcebytestr(exc) 99 report(_(b"failed to recover %s (%s)\n") % (f, e_msg)) 100 else: 101 target = f or b 102 try: 103 vfs.unlink(target) 104 except (IOError, OSError) as inst: 105 if inst.errno != errno.ENOENT: 106 raise 107 except (IOError, OSError, error.Abort): 108 if not c: 109 raise 110 111 backuppath = b"%s.backupfiles" % journal 112 if opener.exists(backuppath): 113 opener.unlink(backuppath) 114 opener.unlink(journal) 115 try: 116 for f in backupfiles: 117 if opener.exists(f): 118 opener.unlink(f) 119 except (IOError, OSError, error.Abort): 120 # only pure backup file remains, it is sage to ignore any error 121 pass 122 123 124class transaction(util.transactional): 125 def __init__( 126 self, 127 report, 128 opener, 129 vfsmap, 130 journalname, 131 undoname=None, 132 after=None, 133 createmode=None, 134 validator=None, 135 releasefn=None, 136 checkambigfiles=None, 137 name='<unnamed>', 138 ): 139 """Begin a new transaction 140 141 Begins a new transaction that allows rolling back writes in the event of 142 an exception. 143 144 * `after`: called after the transaction has been committed 145 * `createmode`: the mode of the journal file that will be created 146 * `releasefn`: called after releasing (with transaction and result) 147 148 `checkambigfiles` is a set of (path, vfs-location) tuples, 149 which determine whether file stat ambiguity should be avoided 150 for corresponded files. 151 """ 152 self._count = 1 153 self._usages = 1 154 self._report = report 155 # a vfs to the store content 156 self._opener = opener 157 # a map to access file in various {location -> vfs} 158 vfsmap = vfsmap.copy() 159 vfsmap[b''] = opener # set default value 160 self._vfsmap = vfsmap 161 self._after = after 162 self._offsetmap = {} 163 self._newfiles = set() 164 self._journal = journalname 165 self._undoname = undoname 166 self._queue = [] 167 # A callback to do something just after releasing transaction. 168 if releasefn is None: 169 releasefn = lambda tr, success: None 170 self._releasefn = releasefn 171 172 self._checkambigfiles = set() 173 if checkambigfiles: 174 self._checkambigfiles.update(checkambigfiles) 175 176 self._names = [name] 177 178 # A dict dedicated to precisely tracking the changes introduced in the 179 # transaction. 180 self.changes = {} 181 182 # a dict of arguments to be passed to hooks 183 self.hookargs = {} 184 self._file = opener.open(self._journal, b"w+") 185 186 # a list of ('location', 'path', 'backuppath', cache) entries. 187 # - if 'backuppath' is empty, no file existed at backup time 188 # - if 'path' is empty, this is a temporary transaction file 189 # - if 'location' is not empty, the path is outside main opener reach. 190 # use 'location' value as a key in a vfsmap to find the right 'vfs' 191 # (cache is currently unused) 192 self._backupentries = [] 193 self._backupmap = {} 194 self._backupjournal = b"%s.backupfiles" % self._journal 195 self._backupsfile = opener.open(self._backupjournal, b'w') 196 self._backupsfile.write(b'%d\n' % version) 197 198 if createmode is not None: 199 opener.chmod(self._journal, createmode & 0o666) 200 opener.chmod(self._backupjournal, createmode & 0o666) 201 202 # hold file generations to be performed on commit 203 self._filegenerators = {} 204 # hold callback to write pending data for hooks 205 self._pendingcallback = {} 206 # True is any pending data have been written ever 207 self._anypending = False 208 # holds callback to call when writing the transaction 209 self._finalizecallback = {} 210 # holds callback to call when validating the transaction 211 # should raise exception if anything is wrong 212 self._validatecallback = {} 213 if validator is not None: 214 self._validatecallback[b'001-userhooks'] = validator 215 # hold callback for post transaction close 216 self._postclosecallback = {} 217 # holds callbacks to call during abort 218 self._abortcallback = {} 219 220 def __repr__(self): 221 name = '/'.join(self._names) 222 return '<transaction name=%s, count=%d, usages=%d>' % ( 223 name, 224 self._count, 225 self._usages, 226 ) 227 228 def __del__(self): 229 if self._journal: 230 self._abort() 231 232 @active 233 def startgroup(self): 234 """delay registration of file entry 235 236 This is used by strip to delay vision of strip offset. The transaction 237 sees either none or all of the strip actions to be done.""" 238 self._queue.append([]) 239 240 @active 241 def endgroup(self): 242 """apply delayed registration of file entry. 243 244 This is used by strip to delay vision of strip offset. The transaction 245 sees either none or all of the strip actions to be done.""" 246 q = self._queue.pop() 247 for f, o in q: 248 self._addentry(f, o) 249 250 @active 251 def add(self, file, offset): 252 """record the state of an append-only file before update""" 253 if ( 254 file in self._newfiles 255 or file in self._offsetmap 256 or file in self._backupmap 257 ): 258 return 259 if self._queue: 260 self._queue[-1].append((file, offset)) 261 return 262 263 self._addentry(file, offset) 264 265 def _addentry(self, file, offset): 266 """add a append-only entry to memory and on-disk state""" 267 if ( 268 file in self._newfiles 269 or file in self._offsetmap 270 or file in self._backupmap 271 ): 272 return 273 if offset: 274 self._offsetmap[file] = offset 275 else: 276 self._newfiles.add(file) 277 # add enough data to the journal to do the truncate 278 self._file.write(b"%s\0%d\n" % (file, offset)) 279 self._file.flush() 280 281 @active 282 def addbackup(self, file, hardlink=True, location=b''): 283 """Adds a backup of the file to the transaction 284 285 Calling addbackup() creates a hardlink backup of the specified file 286 that is used to recover the file in the event of the transaction 287 aborting. 288 289 * `file`: the file path, relative to .hg/store 290 * `hardlink`: use a hardlink to quickly create the backup 291 """ 292 if self._queue: 293 msg = b'cannot use transaction.addbackup inside "group"' 294 raise error.ProgrammingError(msg) 295 296 if ( 297 file in self._newfiles 298 or file in self._offsetmap 299 or file in self._backupmap 300 ): 301 return 302 vfs = self._vfsmap[location] 303 dirname, filename = vfs.split(file) 304 backupfilename = b"%s.backup.%s" % (self._journal, filename) 305 backupfile = vfs.reljoin(dirname, backupfilename) 306 if vfs.exists(file): 307 filepath = vfs.join(file) 308 backuppath = vfs.join(backupfile) 309 util.copyfile(filepath, backuppath, hardlink=hardlink) 310 else: 311 backupfile = b'' 312 313 self._addbackupentry((location, file, backupfile, False)) 314 315 def _addbackupentry(self, entry): 316 """register a new backup entry and write it to disk""" 317 self._backupentries.append(entry) 318 self._backupmap[entry[1]] = len(self._backupentries) - 1 319 self._backupsfile.write(b"%s\0%s\0%s\0%d\n" % entry) 320 self._backupsfile.flush() 321 322 @active 323 def registertmp(self, tmpfile, location=b''): 324 """register a temporary transaction file 325 326 Such files will be deleted when the transaction exits (on both 327 failure and success). 328 """ 329 self._addbackupentry((location, b'', tmpfile, False)) 330 331 @active 332 def addfilegenerator( 333 self, genid, filenames, genfunc, order=0, location=b'' 334 ): 335 """add a function to generates some files at transaction commit 336 337 The `genfunc` argument is a function capable of generating proper 338 content of each entry in the `filename` tuple. 339 340 At transaction close time, `genfunc` will be called with one file 341 object argument per entries in `filenames`. 342 343 The transaction itself is responsible for the backup, creation and 344 final write of such file. 345 346 The `genid` argument is used to ensure the same set of file is only 347 generated once. Call to `addfilegenerator` for a `genid` already 348 present will overwrite the old entry. 349 350 The `order` argument may be used to control the order in which multiple 351 generator will be executed. 352 353 The `location` arguments may be used to indicate the files are located 354 outside of the the standard directory for transaction. It should match 355 one of the key of the `transaction.vfsmap` dictionary. 356 """ 357 # For now, we are unable to do proper backup and restore of custom vfs 358 # but for bookmarks that are handled outside this mechanism. 359 self._filegenerators[genid] = (order, filenames, genfunc, location) 360 361 @active 362 def removefilegenerator(self, genid): 363 """reverse of addfilegenerator, remove a file generator function""" 364 if genid in self._filegenerators: 365 del self._filegenerators[genid] 366 367 def _generatefiles(self, suffix=b'', group=GEN_GROUP_ALL): 368 # write files registered for generation 369 any = False 370 371 if group == GEN_GROUP_ALL: 372 skip_post = skip_pre = False 373 else: 374 skip_pre = group == GEN_GROUP_POST_FINALIZE 375 skip_post = group == GEN_GROUP_PRE_FINALIZE 376 377 for id, entry in sorted(pycompat.iteritems(self._filegenerators)): 378 any = True 379 order, filenames, genfunc, location = entry 380 381 # for generation at closing, check if it's before or after finalize 382 is_post = id in postfinalizegenerators 383 if skip_post and is_post: 384 continue 385 elif skip_pre and not is_post: 386 continue 387 388 vfs = self._vfsmap[location] 389 files = [] 390 try: 391 for name in filenames: 392 name += suffix 393 if suffix: 394 self.registertmp(name, location=location) 395 checkambig = False 396 else: 397 self.addbackup(name, location=location) 398 checkambig = (name, location) in self._checkambigfiles 399 files.append( 400 vfs(name, b'w', atomictemp=True, checkambig=checkambig) 401 ) 402 genfunc(*files) 403 for f in files: 404 f.close() 405 # skip discard() loop since we're sure no open file remains 406 del files[:] 407 finally: 408 for f in files: 409 f.discard() 410 return any 411 412 @active 413 def findoffset(self, file): 414 if file in self._newfiles: 415 return 0 416 return self._offsetmap.get(file) 417 418 @active 419 def readjournal(self): 420 self._file.seek(0) 421 entries = [] 422 for l in self._file.readlines(): 423 file, troffset = l.split(b'\0') 424 entries.append((file, int(troffset))) 425 return entries 426 427 @active 428 def replace(self, file, offset): 429 """ 430 replace can only replace already committed entries 431 that are not pending in the queue 432 """ 433 if file in self._newfiles: 434 if not offset: 435 return 436 self._newfiles.remove(file) 437 self._offsetmap[file] = offset 438 elif file in self._offsetmap: 439 if not offset: 440 del self._offsetmap[file] 441 self._newfiles.add(file) 442 else: 443 self._offsetmap[file] = offset 444 else: 445 raise KeyError(file) 446 self._file.write(b"%s\0%d\n" % (file, offset)) 447 self._file.flush() 448 449 @active 450 def nest(self, name='<unnamed>'): 451 self._count += 1 452 self._usages += 1 453 self._names.append(name) 454 return self 455 456 def release(self): 457 if self._count > 0: 458 self._usages -= 1 459 if self._names: 460 self._names.pop() 461 # if the transaction scopes are left without being closed, fail 462 if self._count > 0 and self._usages == 0: 463 self._abort() 464 465 def running(self): 466 return self._count > 0 467 468 def addpending(self, category, callback): 469 """add a callback to be called when the transaction is pending 470 471 The transaction will be given as callback's first argument. 472 473 Category is a unique identifier to allow overwriting an old callback 474 with a newer callback. 475 """ 476 self._pendingcallback[category] = callback 477 478 @active 479 def writepending(self): 480 """write pending file to temporary version 481 482 This is used to allow hooks to view a transaction before commit""" 483 categories = sorted(self._pendingcallback) 484 for cat in categories: 485 # remove callback since the data will have been flushed 486 any = self._pendingcallback.pop(cat)(self) 487 self._anypending = self._anypending or any 488 self._anypending |= self._generatefiles(suffix=b'.pending') 489 return self._anypending 490 491 @active 492 def hasfinalize(self, category): 493 """check is a callback already exist for a category""" 494 return category in self._finalizecallback 495 496 @active 497 def addfinalize(self, category, callback): 498 """add a callback to be called when the transaction is closed 499 500 The transaction will be given as callback's first argument. 501 502 Category is a unique identifier to allow overwriting old callbacks with 503 newer callbacks. 504 """ 505 self._finalizecallback[category] = callback 506 507 @active 508 def addpostclose(self, category, callback): 509 """add or replace a callback to be called after the transaction closed 510 511 The transaction will be given as callback's first argument. 512 513 Category is a unique identifier to allow overwriting an old callback 514 with a newer callback. 515 """ 516 self._postclosecallback[category] = callback 517 518 @active 519 def getpostclose(self, category): 520 """return a postclose callback added before, or None""" 521 return self._postclosecallback.get(category, None) 522 523 @active 524 def addabort(self, category, callback): 525 """add a callback to be called when the transaction is aborted. 526 527 The transaction will be given as the first argument to the callback. 528 529 Category is a unique identifier to allow overwriting an old callback 530 with a newer callback. 531 """ 532 self._abortcallback[category] = callback 533 534 @active 535 def addvalidator(self, category, callback): 536 """adds a callback to be called when validating the transaction. 537 538 The transaction will be given as the first argument to the callback. 539 540 callback should raise exception if to abort transaction""" 541 self._validatecallback[category] = callback 542 543 @active 544 def close(self): 545 '''commit the transaction''' 546 if self._count == 1: 547 for category in sorted(self._validatecallback): 548 self._validatecallback[category](self) 549 self._validatecallback = None # Help prevent cycles. 550 self._generatefiles(group=GEN_GROUP_PRE_FINALIZE) 551 while self._finalizecallback: 552 callbacks = self._finalizecallback 553 self._finalizecallback = {} 554 categories = sorted(callbacks) 555 for cat in categories: 556 callbacks[cat](self) 557 # Prevent double usage and help clear cycles. 558 self._finalizecallback = None 559 self._generatefiles(group=GEN_GROUP_POST_FINALIZE) 560 561 self._count -= 1 562 if self._count != 0: 563 return 564 self._file.close() 565 self._backupsfile.close() 566 # cleanup temporary files 567 for l, f, b, c in self._backupentries: 568 if l not in self._vfsmap and c: 569 self._report( 570 b"couldn't remove %s: unknown cache location %s\n" % (b, l) 571 ) 572 continue 573 vfs = self._vfsmap[l] 574 if not f and b and vfs.exists(b): 575 try: 576 vfs.unlink(b) 577 except (IOError, OSError, error.Abort) as inst: 578 if not c: 579 raise 580 # Abort may be raise by read only opener 581 self._report( 582 b"couldn't remove %s: %s\n" % (vfs.join(b), inst) 583 ) 584 self._offsetmap = {} 585 self._newfiles = set() 586 self._writeundo() 587 if self._after: 588 self._after() 589 self._after = None # Help prevent cycles. 590 if self._opener.isfile(self._backupjournal): 591 self._opener.unlink(self._backupjournal) 592 if self._opener.isfile(self._journal): 593 self._opener.unlink(self._journal) 594 for l, _f, b, c in self._backupentries: 595 if l not in self._vfsmap and c: 596 self._report( 597 b"couldn't remove %s: unknown cache location" 598 b"%s\n" % (b, l) 599 ) 600 continue 601 vfs = self._vfsmap[l] 602 if b and vfs.exists(b): 603 try: 604 vfs.unlink(b) 605 except (IOError, OSError, error.Abort) as inst: 606 if not c: 607 raise 608 # Abort may be raise by read only opener 609 self._report( 610 b"couldn't remove %s: %s\n" % (vfs.join(b), inst) 611 ) 612 self._backupentries = [] 613 self._journal = None 614 615 self._releasefn(self, True) # notify success of closing transaction 616 self._releasefn = None # Help prevent cycles. 617 618 # run post close action 619 categories = sorted(self._postclosecallback) 620 for cat in categories: 621 self._postclosecallback[cat](self) 622 # Prevent double usage and help clear cycles. 623 self._postclosecallback = None 624 625 @active 626 def abort(self): 627 """abort the transaction (generally called on error, or when the 628 transaction is not explicitly committed before going out of 629 scope)""" 630 self._abort() 631 632 def _writeundo(self): 633 """write transaction data for possible future undo call""" 634 if self._undoname is None: 635 return 636 637 undo_backup_path = b"%s.backupfiles" % self._undoname 638 undobackupfile = self._opener.open(undo_backup_path, b'w') 639 undobackupfile.write(b'%d\n' % version) 640 for l, f, b, c in self._backupentries: 641 if not f: # temporary file 642 continue 643 if not b: 644 u = b'' 645 else: 646 if l not in self._vfsmap and c: 647 self._report( 648 b"couldn't remove %s: unknown cache location" 649 b"%s\n" % (b, l) 650 ) 651 continue 652 vfs = self._vfsmap[l] 653 base, name = vfs.split(b) 654 assert name.startswith(self._journal), name 655 uname = name.replace(self._journal, self._undoname, 1) 656 u = vfs.reljoin(base, uname) 657 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True) 658 undobackupfile.write(b"%s\0%s\0%s\0%d\n" % (l, f, u, c)) 659 undobackupfile.close() 660 661 def _abort(self): 662 entries = self.readjournal() 663 self._count = 0 664 self._usages = 0 665 self._file.close() 666 self._backupsfile.close() 667 668 try: 669 if not entries and not self._backupentries: 670 if self._backupjournal: 671 self._opener.unlink(self._backupjournal) 672 if self._journal: 673 self._opener.unlink(self._journal) 674 return 675 676 self._report(_(b"transaction abort!\n")) 677 678 try: 679 for cat in sorted(self._abortcallback): 680 self._abortcallback[cat](self) 681 # Prevent double usage and help clear cycles. 682 self._abortcallback = None 683 _playback( 684 self._journal, 685 self._report, 686 self._opener, 687 self._vfsmap, 688 entries, 689 self._backupentries, 690 False, 691 checkambigfiles=self._checkambigfiles, 692 ) 693 self._report(_(b"rollback completed\n")) 694 except BaseException as exc: 695 self._report(_(b"rollback failed - please run hg recover\n")) 696 self._report( 697 _(b"(failure reason: %s)\n") % stringutil.forcebytestr(exc) 698 ) 699 finally: 700 self._journal = None 701 self._releasefn(self, False) # notify failure of transaction 702 self._releasefn = None # Help prevent cycles. 703 704 705BAD_VERSION_MSG = _( 706 b"journal was created by a different version of Mercurial\n" 707) 708 709 710def rollback(opener, vfsmap, file, report, checkambigfiles=None): 711 """Rolls back the transaction contained in the given file 712 713 Reads the entries in the specified file, and the corresponding 714 '*.backupfiles' file, to recover from an incomplete transaction. 715 716 * `file`: a file containing a list of entries, specifying where 717 to truncate each file. The file should contain a list of 718 file\0offset pairs, delimited by newlines. The corresponding 719 '*.backupfiles' file should contain a list of file\0backupfile 720 pairs, delimited by \0. 721 722 `checkambigfiles` is a set of (path, vfs-location) tuples, 723 which determine whether file stat ambiguity should be avoided at 724 restoring corresponded files. 725 """ 726 entries = [] 727 backupentries = [] 728 729 with opener.open(file) as fp: 730 lines = fp.readlines() 731 for l in lines: 732 try: 733 f, o = l.split(b'\0') 734 entries.append((f, int(o))) 735 except ValueError: 736 report( 737 _(b"couldn't read journal entry %r!\n") % pycompat.bytestr(l) 738 ) 739 740 backupjournal = b"%s.backupfiles" % file 741 if opener.exists(backupjournal): 742 fp = opener.open(backupjournal) 743 lines = fp.readlines() 744 if lines: 745 ver = lines[0][:-1] 746 if ver != (b'%d' % version): 747 report(BAD_VERSION_MSG) 748 else: 749 for line in lines[1:]: 750 if line: 751 # Shave off the trailing newline 752 line = line[:-1] 753 l, f, b, c = line.split(b'\0') 754 backupentries.append((l, f, b, bool(c))) 755 756 _playback( 757 file, 758 report, 759 opener, 760 vfsmap, 761 entries, 762 backupentries, 763 checkambigfiles=checkambigfiles, 764 ) 765