1############################################################################## 2# 3# Copyright (c) 2001, 2002 Zope Foundation and Contributors. 4# All Rights Reserved. 5# 6# This software is subject to the provisions of the Zope Public License, 7# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. 8# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED 9# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 10# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 11# FOR A PARTICULAR PURPOSE 12# 13############################################################################## 14"""Storage implementation using a log written to a single file. 15""" 16from __future__ import print_function 17 18import binascii 19import contextlib 20import errno 21import logging 22import os 23import time 24from struct import pack 25from struct import unpack 26 27from persistent.TimeStamp import TimeStamp 28from six import string_types as STRING_TYPES 29from zc.lockfile import LockFile 30from zope.interface import alsoProvides 31from zope.interface import implementer 32 33from .. import utils 34 35from ZODB.blob import BlobStorageMixin 36from ZODB.blob import link_or_copy 37from ZODB.blob import remove_committed 38from ZODB.blob import remove_committed_dir 39from ZODB.BaseStorage import BaseStorage 40from ZODB.BaseStorage import DataRecord as _DataRecord 41from ZODB.BaseStorage import TransactionRecord as _TransactionRecord 42from ZODB.ConflictResolution import ConflictResolvingStorage 43from ZODB.FileStorage.format import CorruptedDataError 44from ZODB.FileStorage.format import CorruptedError 45from ZODB.FileStorage.format import DATA_HDR 46from ZODB.FileStorage.format import DATA_HDR_LEN 47from ZODB.FileStorage.format import DataHeader 48from ZODB.FileStorage.format import FileStorageFormatter 49from ZODB.FileStorage.format import TRANS_HDR 50from ZODB.FileStorage.format import TRANS_HDR_LEN 51from ZODB.FileStorage.format import TxnHeader 52from ZODB.FileStorage.fspack import FileStoragePacker 53from ZODB.interfaces import IBlobStorageRestoreable 54from ZODB.interfaces import IExternalGC 55from ZODB.interfaces import IStorage 56from ZODB.interfaces import IStorageCurrentRecordIteration 57from ZODB.interfaces import IStorageIteration 58from ZODB.interfaces import IStorageRestoreable 59from ZODB.interfaces import IStorageUndoable 60from ZODB.POSException import ConflictError 61from ZODB.POSException import MultipleUndoErrors 62from ZODB.POSException import POSKeyError 63from ZODB.POSException import ReadOnlyError 64from ZODB.POSException import StorageError 65from ZODB.POSException import StorageSystemError 66from ZODB.POSException import StorageTransactionError 67from ZODB.POSException import UndoError 68from ZODB.fsIndex import fsIndex 69from ZODB.utils import as_bytes 70from ZODB.utils import as_text 71from ZODB.utils import cp 72from ZODB.utils import load_current 73from ZODB.utils import mktemp 74from ZODB.utils import p64 75from ZODB.utils import u64 76from ZODB.utils import z64 77from ZODB._compat import Pickler 78from ZODB._compat import loads 79from ZODB._compat import decodebytes 80from ZODB._compat import encodebytes 81from ZODB._compat import _protocol 82from ZODB._compat import FILESTORAGE_MAGIC 83 84 85# Not all platforms have fsync 86fsync = getattr(os, "fsync", None) 87 88packed_version = FILESTORAGE_MAGIC 89 90logger = logging.getLogger('ZODB.FileStorage') 91 92def panic(message, *data): 93 logger.critical(message, *data) 94 raise CorruptedTransactionError(message % data) 95 96class FileStorageError(StorageError): 97 pass 98 99class PackError(FileStorageError): 100 pass 101 102class FileStorageFormatError(FileStorageError): 103 """Invalid file format 104 105 The format of the given file is not valid. 106 """ 107 108class CorruptedFileStorageError(FileStorageError, 109 StorageSystemError): 110 """Corrupted file storage.""" 111 112class CorruptedTransactionError(CorruptedFileStorageError): 113 pass 114 115class FileStorageQuotaError(FileStorageError, 116 StorageSystemError): 117 """File storage quota exceeded.""" 118 119# Intended to be raised only in fspack.py, and ignored here. 120class RedundantPackWarning(FileStorageError): 121 pass 122 123class TempFormatter(FileStorageFormatter): 124 """Helper class used to read formatted FileStorage data.""" 125 126 def __init__(self, afile): 127 self._file = afile 128 129@implementer( 130 IStorageRestoreable, 131 IStorageIteration, 132 IStorageUndoable, 133 IStorageCurrentRecordIteration, 134 IExternalGC, 135 IStorage, 136 ) 137class FileStorage( 138 FileStorageFormatter, 139 BlobStorageMixin, 140 ConflictResolvingStorage, 141 BaseStorage, 142 ): 143 """Storage that saves data in a file 144 """ 145 146 # Set True while a pack is in progress; undo is blocked for the duration. 147 _pack_is_in_progress = False 148 149 def __init__(self, file_name, create=False, read_only=False, stop=None, 150 quota=None, pack_gc=True, pack_keep_old=True, packer=None, 151 blob_dir=None): 152 """Create a file storage 153 154 :param str file_name: Path to store data file 155 :param bool create: Flag indicating whether a file should be 156 created even if it already exists. 157 :param bool read_only: Flag indicating whether the file is 158 read only. Only one process is able to open the file 159 non-read-only. 160 :param bytes stop: Time-travel transaction id 161 When the file is opened, data will be read up to the given 162 transaction id. Transaction ids correspond to times and 163 you can compute transaction ids for a given time using 164 :class:`~ZODB.TimeStamp.TimeStamp`. 165 :param int quota: File-size quota 166 :param bool pack_gc: Flag indicating whether garbage 167 collection should be performed when packing. 168 :param bool pack_keep_old: flag indicating whether old data 169 files should be retained after packing as a ``.old`` file. 170 :param callable packer: An alternative 171 :interface:`packer <ZODB.FileStorage.interfaces.IFileStoragePacker>`. 172 :param str blob_dir: A blob-directory path name. 173 Blobs will be supported if this option is provided. 174 175 A file storage stores data in a single file that behaves like 176 a traditional transaction log. New data records are appended 177 to the end of the file. Periodically, the file is packed to 178 free up space. When this is done, current records as of the 179 pack time or later are copied to a new file, which replaces 180 the old file. 181 182 FileStorages keep in-memory indexes mapping object oids to the 183 location of their current records in the file. Back pointers to 184 previous records allow access to non-current records from the 185 current records. 186 187 In addition to the data file, some ancillary files are 188 created. These can be lost without affecting data 189 integrity, however losing the index file may cause extremely 190 slow startup. Each has a name that's a concatenation of the 191 original file and a suffix. The files are listed below by 192 suffix: 193 194 .index 195 Snapshot of the in-memory index. This are created on 196 shutdown, packing, and after rebuilding an index when one 197 was not found. For large databases, creating a 198 file-storage object without an index file can take very 199 long because it's necessary to scan the data file to build 200 the index. 201 202 .lock 203 A lock file preventing multiple processes from opening a 204 file storage on non-read-only mode. 205 206 .tmp 207 A file used to store data being committed in the first phase 208 of 2-phase commit 209 210 .index_tmp 211 A temporary file used when saving the in-memory index to 212 avoid overwriting an existing index until a new index has 213 been fully saved. 214 215 .pack 216 A temporary file written while packing containing current 217 records as of and after the pack time. 218 219 .old 220 The previous database file after a pack. 221 222 When the database is packed, current records as of the pack 223 time and later are written to the ``.pack`` file. At the end 224 of packing, the ``.old`` file is removed, if it exists, and 225 the data file is renamed to the ``.old`` file and finally the 226 ``.pack`` file is rewritten to the data file. 227 """ 228 229 if read_only: 230 self._is_read_only = True 231 if create: 232 raise ValueError("can't create a read-only file") 233 elif stop is not None: 234 raise ValueError("time-travel only supported in read-only mode") 235 236 if stop is None: 237 stop = b'\377'*8 238 239 # Lock the database and set up the temp file. 240 if not read_only: 241 # Create the lock file 242 self._lock_file = LockFile(file_name + '.lock') 243 self._tfile = open(file_name + '.tmp', 'w+b') 244 self._tfmt = TempFormatter(self._tfile) 245 else: 246 self._tfile = None 247 248 self._file_name = os.path.abspath(file_name) 249 250 self._pack_gc = pack_gc 251 self.pack_keep_old = pack_keep_old 252 if packer is not None: 253 self.packer = packer 254 255 BaseStorage.__init__(self, file_name) 256 257 index, tindex = self._newIndexes() 258 self._initIndex(index, tindex) 259 260 # Now open the file 261 262 self._file = None 263 if not create: 264 try: 265 self._file = open(file_name, read_only and 'rb' or 'r+b') 266 except IOError as exc: 267 if exc.errno == errno.EFBIG: 268 # The file is too big to open. Fail visibly. 269 raise 270 if read_only: 271 # When open request is read-only we do not want to create 272 # the file 273 raise 274 if exc.errno == errno.ENOENT: 275 # The file doesn't exist. Create it. 276 create = 1 277 # If something else went wrong, it's hard to guess 278 # what the problem was. If the file does not exist, 279 # create it. Otherwise, fail. 280 if os.path.exists(file_name): 281 raise 282 else: 283 create = 1 284 285 if self._file is None and create: 286 if os.path.exists(file_name): 287 os.remove(file_name) 288 self._file = open(file_name, 'w+b') 289 self._file.write(packed_version) 290 291 self._files = FilePool(self._file_name) 292 r = self._restore_index() 293 if r is not None: 294 self._used_index = 1 # Marker for testing 295 index, start, ltid = r 296 297 self._initIndex(index, tindex) 298 self._pos, self._oid, tid = read_index( 299 self._file, file_name, index, tindex, stop, 300 ltid=ltid, start=start, read_only=read_only, 301 ) 302 else: 303 self._used_index = 0 # Marker for testing 304 self._pos, self._oid, tid = read_index( 305 self._file, file_name, index, tindex, stop, 306 read_only=read_only, 307 ) 308 self._save_index() 309 310 self._ltid = tid 311 312 # self._pos should always point just past the last 313 # transaction. During 2PC, data is written after _pos. 314 # invariant is restored at tpc_abort() or tpc_finish(). 315 316 self._ts = tid = TimeStamp(tid) 317 t = time.time() 318 t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,)) 319 if tid > t: 320 seconds = tid.timeTime() - t.timeTime() 321 complainer = logger.warning 322 if seconds > 30 * 60: # 30 minutes -- way screwed up 323 complainer = logger.critical 324 complainer("%s Database records %d seconds in the future", 325 file_name, seconds) 326 327 self._quota = quota 328 329 if blob_dir: 330 self.blob_dir = os.path.abspath(blob_dir) 331 if create and os.path.exists(self.blob_dir): 332 remove_committed_dir(self.blob_dir) 333 334 self._blob_init(blob_dir) 335 alsoProvides(self, IBlobStorageRestoreable) 336 else: 337 self.blob_dir = None 338 self._blob_init_no_blobs() 339 340 def copyTransactionsFrom(self, other): 341 if self.blob_dir: 342 return BlobStorageMixin.copyTransactionsFrom(self, other) 343 else: 344 return BaseStorage.copyTransactionsFrom(self, other) 345 346 def _initIndex(self, index, tindex): 347 self._index=index 348 self._tindex=tindex 349 self._index_get=index.get 350 351 def __len__(self): 352 return len(self._index) 353 354 def _newIndexes(self): 355 # hook to use something other than builtin dict 356 return fsIndex(), {} 357 358 _saved = 0 359 def _save_index(self): 360 """Write the database index to a file to support quick startup.""" 361 362 if self._is_read_only: 363 return 364 365 index_name = self.__name__ + '.index' 366 tmp_name = index_name + '.index_tmp' 367 368 self._index.save(self._pos, tmp_name) 369 370 try: 371 try: 372 os.remove(index_name) 373 except OSError: 374 pass 375 os.rename(tmp_name, index_name) 376 except: pass 377 378 self._saved += 1 379 380 def _clear_index(self): 381 index_name = self.__name__ + '.index' 382 if os.path.exists(index_name): 383 try: 384 os.remove(index_name) 385 except OSError: 386 pass 387 388 def _sane(self, index, pos): 389 """Sanity check saved index data by reading the last undone trans 390 391 Basically, we read the last not undone transaction and 392 check to see that the included records are consistent 393 with the index. Any invalid record records or inconsistent 394 object positions cause zero to be returned. 395 """ 396 r = self._check_sanity(index, pos) 397 if not r: 398 logger.warning("Ignoring index for %s", self._file_name) 399 return r 400 401 def _check_sanity(self, index, pos): 402 403 if pos < 100: 404 return 0 # insane 405 self._file.seek(0, 2) 406 if self._file.tell() < pos: 407 return 0 # insane 408 ltid = None 409 410 max_checked = 5 411 checked = 0 412 413 while checked < max_checked: 414 self._file.seek(pos - 8) 415 rstl = self._file.read(8) 416 tl = u64(rstl) 417 pos = pos - tl - 8 418 if pos < 4: 419 return 0 # insane 420 h = self._read_txn_header(pos) 421 if not ltid: 422 ltid = h.tid 423 if h.tlen != tl: 424 return 0 # inconsistent lengths 425 if h.status == 'u': 426 continue # undone trans, search back 427 if h.status not in ' p': 428 return 0 # insane 429 if tl < h.headerlen(): 430 return 0 # insane 431 tend = pos + tl 432 opos = pos + h.headerlen() 433 if opos == tend: 434 continue # empty trans 435 436 while opos < tend and checked < max_checked: 437 # Read the data records for this transaction 438 h = self._read_data_header(opos) 439 440 if opos + h.recordlen() > tend or h.tloc != pos: 441 return 0 442 443 if index.get(h.oid, 0) != opos: 444 return 0 # insane 445 446 checked += 1 447 448 opos = opos + h.recordlen() 449 450 return ltid 451 452 def _restore_index(self): 453 """Load database index to support quick startup.""" 454 # Returns (index, pos, tid), or None in case of error. 455 # The index returned is always an instance of fsIndex. If the 456 # index cached in the file is a Python dict, it's converted to 457 # fsIndex here, and, if we're not in read-only mode, the .index 458 # file is rewritten with the converted fsIndex so we don't need to 459 # convert it again the next time. 460 file_name=self.__name__ 461 index_name=file_name+'.index' 462 463 if os.path.exists(index_name): 464 try: 465 info = fsIndex.load(index_name) 466 except: 467 logger.exception('loading index') 468 return None 469 else: 470 return None 471 472 index = info.get('index') 473 pos = info.get('pos') 474 if index is None or pos is None: 475 return None 476 pos = int(pos) 477 478 if (isinstance(index, dict) or 479 (isinstance(index, fsIndex) and 480 isinstance(index._data, dict))): 481 # Convert dictionary indexes to fsIndexes *or* convert fsIndexes 482 # which have a dict `_data` attribute to a new fsIndex (newer 483 # fsIndexes have an OOBTree as `_data`). 484 newindex = fsIndex() 485 newindex.update(index) 486 index = newindex 487 if not self._is_read_only: 488 # Save the converted index. 489 f = open(index_name, 'wb') 490 p = Pickler(f, _protocol) 491 info['index'] = index 492 p.dump(info) 493 f.close() 494 # Now call this method again to get the new data. 495 return self._restore_index() 496 497 tid = self._sane(index, pos) 498 if not tid: 499 return None 500 501 return index, pos, tid 502 503 def close(self): 504 self._file.close() 505 self._files.close() 506 if hasattr(self,'_lock_file'): 507 self._lock_file.close() 508 if self._tfile: 509 self._tfile.close() 510 try: 511 self._save_index() 512 except: 513 # Log the error and continue 514 logger.exception("Error saving index on close()") 515 516 def getSize(self): 517 return self._pos 518 519 def _lookup_pos(self, oid): 520 try: 521 return self._index[oid] 522 except KeyError: 523 raise POSKeyError(oid) 524 except TypeError: 525 raise TypeError("invalid oid %r" % (oid,)) 526 527 load = load_current # Keep load for now for old clients 528 529 def load(self, oid, version=''): 530 """Return pickle data and serial number.""" 531 assert not version 532 533 with self._files.get() as _file: 534 pos = self._lookup_pos(oid) 535 h = self._read_data_header(pos, oid, _file) 536 if h.plen: 537 data = _file.read(h.plen) 538 return data, h.tid 539 elif h.back: 540 # Get the data from the backpointer, but tid from 541 # current txn. 542 data = self._loadBack_impl(oid, h.back, _file=_file)[0] 543 return data, h.tid 544 else: 545 raise POSKeyError(oid) 546 547 def loadSerial(self, oid, serial): 548 with self._lock: 549 pos = self._lookup_pos(oid) 550 while 1: 551 h = self._read_data_header(pos, oid) 552 if h.tid == serial: 553 break 554 pos = h.prev 555 if h.tid < serial or not pos: 556 raise POSKeyError(oid) 557 if h.plen: 558 return self._file.read(h.plen) 559 else: 560 return self._loadBack_impl(oid, h.back)[0] 561 562 def loadBefore(self, oid, tid): 563 with self._files.get() as _file: 564 pos = self._lookup_pos(oid) 565 end_tid = None 566 while True: 567 h = self._read_data_header(pos, oid, _file) 568 if h.tid < tid: 569 break 570 571 pos = h.prev 572 end_tid = h.tid 573 if not pos: 574 return None 575 576 if h.plen: 577 return _file.read(h.plen), h.tid, end_tid 578 elif h.back: 579 data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file) 580 return data, h.tid, end_tid 581 else: 582 raise POSKeyError(oid) 583 584 def store(self, oid, oldserial, data, version, transaction): 585 if self._is_read_only: 586 raise ReadOnlyError() 587 if transaction is not self._transaction: 588 raise StorageTransactionError(self, transaction) 589 assert not version 590 591 with self._lock: 592 if oid > self._oid: 593 self.set_max_oid(oid) 594 old = self._index_get(oid, 0) 595 committed_tid = None 596 pnv = None 597 if old: 598 h = self._read_data_header(old, oid) 599 committed_tid = h.tid 600 601 if oldserial != committed_tid: 602 data = self.tryToResolveConflict(oid, committed_tid, 603 oldserial, data) 604 self._resolved.append(oid) 605 606 pos = self._pos 607 here = pos + self._tfile.tell() + self._thl 608 self._tindex[oid] = here 609 new = DataHeader(oid, self._tid, old, pos, 0, len(data)) 610 611 self._tfile.write(new.asString()) 612 self._tfile.write(data) 613 614 # Check quota 615 if self._quota is not None and here > self._quota: 616 raise FileStorageQuotaError( 617 "The storage quota has been exceeded.") 618 619 def deleteObject(self, oid, oldserial, transaction): 620 if self._is_read_only: 621 raise ReadOnlyError() 622 if transaction is not self._transaction: 623 raise StorageTransactionError(self, transaction) 624 625 with self._lock: 626 old = self._index_get(oid, 0) 627 if not old: 628 raise POSKeyError(oid) 629 h = self._read_data_header(old, oid) 630 committed_tid = h.tid 631 632 if oldserial != committed_tid: 633 raise ConflictError( 634 oid=oid, serials=(committed_tid, oldserial)) 635 636 pos = self._pos 637 here = pos + self._tfile.tell() + self._thl 638 self._tindex[oid] = here 639 new = DataHeader(oid, self._tid, old, pos, 0, 0) 640 self._tfile.write(new.asString()) 641 self._tfile.write(z64) 642 643 # Check quota 644 if self._quota is not None and here > self._quota: 645 raise FileStorageQuotaError( 646 "The storage quota has been exceeded.") 647 648 def _data_find(self, tpos, oid, data): 649 # Return backpointer for oid. Must call with the lock held. 650 # This is a file offset to oid's data record if found, else 0. 651 # The data records in the transaction at tpos are searched for oid. 652 # If a data record for oid isn't found, returns 0. 653 # Else if oid's data record contains a backpointer, that 654 # backpointer is returned. 655 # Else oid's data record contains the data, and the file offset of 656 # oid's data record is returned. This data record should contain 657 # a pickle identical to the 'data' argument. 658 659 # Unclear: If the length of the stored data doesn't match len(data), 660 # an exception is raised. If the lengths match but the data isn't 661 # the same, 0 is returned. Why the discrepancy? 662 self._file.seek(tpos) 663 h = self._file.read(TRANS_HDR_LEN) 664 tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h) 665 status = as_text(status) 666 self._file.read(ul + dl + el) 667 tend = tpos + tl + 8 668 pos = self._file.tell() 669 while pos < tend: 670 h = self._read_data_header(pos) 671 if h.oid == oid: 672 # Make sure this looks like the right data record 673 if h.plen == 0: 674 # This is also a backpointer. Gotta trust it. 675 return pos 676 if h.plen != len(data): 677 # The expected data doesn't match what's in the 678 # backpointer. Something is wrong. 679 logger.error("Mismatch between data and" 680 " backpointer at %d", pos) 681 return 0 682 _data = self._file.read(h.plen) 683 if data != _data: 684 return 0 685 return pos 686 pos += h.recordlen() 687 self._file.seek(pos) 688 return 0 689 690 def restore(self, oid, serial, data, version, prev_txn, transaction): 691 # A lot like store() but without all the consistency checks. This 692 # should only be used when we /know/ the data is good, hence the 693 # method name. While the signature looks like store() there are some 694 # differences: 695 # 696 # - serial is the serial number of /this/ revision, not of the 697 # previous revision. It is used instead of self._tid, which is 698 # ignored. 699 # 700 # - Nothing is returned 701 # 702 # - data can be None, which indicates a George Bailey object 703 # (i.e. one who's creation has been transactionally undone). 704 # 705 # prev_txn is a backpointer. In the original database, it's possible 706 # that the data was actually living in a previous transaction. This 707 # can happen for transactional undo and other operations, and is used 708 # as a space saving optimization. Under some circumstances the 709 # prev_txn may not actually exist in the target database (i.e. self) 710 # for example, if it's been packed away. In that case, the prev_txn 711 # should be considered just a hint, and is ignored if the transaction 712 # doesn't exist. 713 if self._is_read_only: 714 raise ReadOnlyError() 715 if transaction is not self._transaction: 716 raise StorageTransactionError(self, transaction) 717 if version: 718 raise TypeError("Versions are no-longer supported") 719 720 with self._lock: 721 if oid > self._oid: 722 self.set_max_oid(oid) 723 prev_pos = 0 724 if prev_txn is not None: 725 prev_txn_pos = self._txn_find(prev_txn, 0) 726 if prev_txn_pos: 727 prev_pos = self._data_find(prev_txn_pos, oid, data) 728 old = self._index_get(oid, 0) 729 # Calculate the file position in the temporary file 730 here = self._pos + self._tfile.tell() + self._thl 731 # And update the temp file index 732 self._tindex[oid] = here 733 if prev_pos: 734 # If there is a valid prev_pos, don't write data. 735 data = None 736 if data is None: 737 dlen = 0 738 else: 739 dlen = len(data) 740 741 # Write the recovery data record 742 new = DataHeader(oid, serial, old, self._pos, 0, dlen) 743 744 self._tfile.write(new.asString()) 745 746 # Finally, write the data or a backpointer. 747 if data is None: 748 if prev_pos: 749 self._tfile.write(p64(prev_pos)) 750 else: 751 # Write a zero backpointer, which indicates an 752 # un-creation transaction. 753 self._tfile.write(z64) 754 else: 755 self._tfile.write(data) 756 757 def supportsUndo(self): 758 return 1 759 760 def _clear_temp(self): 761 self._tindex.clear() 762 if self._tfile is not None: 763 self._tfile.seek(0) 764 765 def _begin(self, tid, u, d, e): 766 self._nextpos = 0 767 self._thl = TRANS_HDR_LEN + len(u) + len(d) + len(e) 768 if self._thl > 65535: 769 # one of u, d, or e may be > 65535 770 # We have to check lengths here because struct.pack 771 # doesn't raise an exception on overflow! 772 if len(u) > 65535: 773 raise FileStorageError('user name too long') 774 if len(d) > 65535: 775 raise FileStorageError('description too long') 776 if len(e) > 65535: 777 raise FileStorageError('too much extension data') 778 779 def tpc_vote(self, transaction): 780 with self._lock: 781 if transaction is not self._transaction: 782 raise StorageTransactionError( 783 "tpc_vote called with wrong transaction") 784 dlen = self._tfile.tell() 785 self._tfile.seek(0) 786 user, descr, ext = self._ude 787 788 self._file.seek(self._pos) 789 tl = self._thl + dlen 790 791 try: 792 h = TxnHeader(self._tid, tl, "c", len(user), 793 len(descr), len(ext)) 794 h.user = user 795 h.descr = descr 796 h.ext = ext 797 self._file.write(h.asString()) 798 cp(self._tfile, self._file, dlen) 799 self._file.write(p64(tl)) 800 self._file.flush() 801 except: 802 # Hm, an error occurred writing out the data. Maybe the 803 # disk is full. We don't want any turd at the end. 804 self._file.truncate(self._pos) 805 self._files.flush() 806 raise 807 self._nextpos = self._pos + (tl + 8) 808 return self._resolved 809 810 def tpc_finish(self, transaction, f=None): 811 with self._files.write_lock(): 812 with self._lock: 813 if transaction is not self._transaction: 814 raise StorageTransactionError( 815 "tpc_finish called with wrong transaction") 816 try: 817 tid = self._tid 818 if f is not None: 819 f(tid) 820 self._finish(tid, *self._ude) 821 self._clear_temp() 822 finally: 823 self._ude = None 824 self._transaction = None 825 self._commit_lock.release() 826 return tid 827 828 def _finish(self, tid, u, d, e): 829 # Clear the checkpoint flag 830 self._file.seek(self._pos+16) 831 self._file.write(as_bytes(self._tstatus)) 832 try: 833 # At this point, we may have committed the data to disk. 834 # If we fail from here, we're in bad shape. 835 self._finish_finish(tid) 836 except: 837 # Ouch. This is bad. Let's try to get back to where we were 838 # and then roll over and die 839 logger.critical("Failure in _finish. Closing.", exc_info=True) 840 self.close() 841 raise 842 843 def _finish_finish(self, tid): 844 # This is a separate method to allow tests to replace it with 845 # something broken. :) 846 847 self._file.flush() 848 if fsync is not None: 849 fsync(self._file.fileno()) 850 851 self._pos = self._nextpos 852 self._index.update(self._tindex) 853 self._ltid = tid 854 self._blob_tpc_finish() 855 856 def _abort(self): 857 if self._nextpos: 858 self._file.truncate(self._pos) 859 self._files.flush() 860 self._nextpos=0 861 self._blob_tpc_abort() 862 863 def _undoDataInfo(self, oid, pos, tpos): 864 """Return the tid, data pointer, and data for the oid record at pos 865 """ 866 if tpos: 867 itpos = tpos - self._pos - self._thl 868 pos = tpos 869 tpos = self._tfile.tell() 870 h = self._tfmt._read_data_header(itpos, oid) 871 afile = self._tfile 872 else: 873 h = self._read_data_header(pos, oid) 874 afile = self._file 875 876 if h.oid != oid: 877 raise UndoError("Invalid undo transaction id", oid) 878 879 if h.plen: 880 data = afile.read(h.plen) 881 else: 882 data = '' 883 pos = h.back 884 885 if tpos: 886 self._tfile.seek(tpos) # Restore temp file to end 887 888 return h.tid, pos, data 889 890 def getTid(self, oid): 891 with self._lock: 892 pos = self._lookup_pos(oid) 893 h = self._read_data_header(pos, oid) 894 if h.plen == 0 and h.back == 0: 895 # Undone creation 896 raise POSKeyError(oid) 897 return h.tid 898 899 def _transactionalUndoRecord(self, oid, pos, tid, pre): 900 """Get the undo information for a data record 901 902 'pos' points to the data header for 'oid' in the transaction 903 being undone. 'tid' refers to the transaction being undone. 904 'pre' is the 'prev' field of the same data header. 905 906 Return a 3-tuple consisting of a pickle, data pointer, and 907 current position. If the pickle is true, then the data 908 pointer must be 0, but the pickle can be empty *and* the 909 pointer 0. 910 """ 911 912 copy = True # Can we just copy a data pointer 913 914 # First check if it is possible to undo this record. 915 tpos = self._tindex.get(oid, 0) 916 ipos = self._index.get(oid, 0) 917 tipos = tpos or ipos 918 919 if tipos != pos: 920 # The transaction being undone isn't current because: 921 # a) A later transaction was committed ipos != pos, or 922 # b) A change was made in the current transaction. This 923 # could only be a previous undo in a multi-undo. 924 # (We don't allow multiple data managers with the same 925 # storage to participate in the same transaction.) 926 assert tipos > pos 927 928 # Get current data, as identified by tipos. We'll use 929 # it to decide if and how we can undo in this case. 930 ctid, cdataptr, current_data = self._undoDataInfo(oid, ipos, tpos) 931 932 if cdataptr != pos: 933 934 # if cdataptr was == pos, then we'd be cool, because 935 # we're dealing with the same data. 936 937 # Because they aren't equal, we have to dig deeper 938 939 # Let's see if data to be undone and current data 940 # are the same. If not, we'll have to decide whether 941 # we should try conflict resolution. 942 943 try: 944 data_to_be_undone = self._loadBack_impl(oid, pos)[0] 945 if not current_data: 946 current_data = self._loadBack_impl(oid, cdataptr)[0] 947 948 if data_to_be_undone != current_data: 949 # OK, so the current data is different from 950 # the data being undone. We can't just copy: 951 copy = False 952 953 if not pre: 954 # The transaction we're undoing has no 955 # previous state to merge with, so we 956 # can't resolve a conflict. 957 raise UndoError( 958 "Can't undo an add transaction followed by" 959 " conflicting transactions.", oid) 960 except KeyError: 961 # LoadBack gave us a key error. Bail. 962 raise UndoError("_loadBack() failed", oid) 963 964 # Return the data that should be written in the undo record. 965 if not pre: 966 # We're undoing object addition. We're doing this because 967 # subsequent transactions has no net effect on the state 968 # (possibly because some of them were undos). 969 return "", 0, ipos 970 971 if copy: 972 # we can just copy our previous-record pointer forward 973 return "", pre, ipos 974 975 try: 976 pre_data = self._loadBack_impl(oid, pre)[0] 977 except KeyError: 978 # couldn't find oid; what's the real explanation for this? 979 raise UndoError("_loadBack() failed for %s", oid) 980 981 try: 982 data = self.tryToResolveConflict( 983 oid, ctid, tid, pre_data, current_data) 984 return data, 0, ipos 985 except ConflictError: 986 pass 987 988 raise UndoError("Some data were modified by a later transaction", oid) 989 990 # undoLog() returns a description dict that includes an id entry. 991 # The id is opaque to the client, but contains the transaction id. 992 # The transactionalUndo() implementation does a simple linear 993 # search through the file (from the end) to find the transaction. 994 995 def undoLog(self, first=0, last=-20, filter=None): 996 if last < 0: 997 # -last is supposed to be the max # of transactions. Convert to 998 # a positive index. Should have x - first = -last, which 999 # means x = first - last. This is spelled out here because 1000 # the normalization code was incorrect for years (used +1 1001 # instead -- off by 1), until ZODB 3.4. 1002 last = first - last 1003 with self._lock: 1004 if self._pack_is_in_progress: 1005 raise UndoError( 1006 'Undo is currently disabled for database maintenance.<p>') 1007 us = UndoSearch(self._file, self._pos, first, last, filter) 1008 while not us.finished(): 1009 # Hold lock for batches of 20 searches, so default search 1010 # parameters will finish without letting another thread run. 1011 for i in range(20): 1012 if us.finished(): 1013 break 1014 us.search() 1015 # Give another thread a chance, so that a long undoLog() 1016 # operation doesn't block all other activity. 1017 self._lock.release() 1018 self._lock.acquire() 1019 return us.results 1020 1021 def undo(self, transaction_id, transaction): 1022 """Undo a transaction, given by transaction_id. 1023 1024 Do so by writing new data that reverses the action taken by 1025 the transaction. 1026 1027 Usually, we can get by with just copying a data pointer, by 1028 writing a file position rather than a pickle. Sometimes, we 1029 may do conflict resolution, in which case we actually copy 1030 new data that results from resolution. 1031 """ 1032 1033 if self._is_read_only: 1034 raise ReadOnlyError() 1035 if transaction is not self._transaction: 1036 raise StorageTransactionError(self, transaction) 1037 1038 with self._lock: 1039 # Find the right transaction to undo and call _txn_undo_write(). 1040 tid = decodebytes(transaction_id + b'\n') 1041 assert len(tid) == 8 1042 tpos = self._txn_find(tid, 1) 1043 tindex = self._txn_undo_write(tpos) 1044 self._tindex.update(tindex) 1045 return self._tid, tindex.keys() 1046 1047 def _txn_find(self, tid, stop_at_pack): 1048 pos = self._pos 1049 while pos > 39: 1050 self._file.seek(pos - 8) 1051 pos = pos - u64(self._file.read(8)) - 8 1052 self._file.seek(pos) 1053 h = self._file.read(TRANS_HDR_LEN) 1054 _tid = h[:8] 1055 if _tid == tid: 1056 return pos 1057 if stop_at_pack: 1058 # check the status field of the transaction header 1059 if h[16] == b'p': 1060 break 1061 raise UndoError("Invalid transaction id") 1062 1063 def _txn_undo_write(self, tpos): 1064 # a helper function to write the data records for transactional undo 1065 1066 otloc = self._pos 1067 here = self._pos + self._tfile.tell() + self._thl 1068 base = here - self._tfile.tell() 1069 # Let's move the file pointer back to the start of the txn record. 1070 th = self._read_txn_header(tpos) 1071 if th.status != " ": 1072 raise UndoError('non-undoable transaction') 1073 tend = tpos + th.tlen 1074 pos = tpos + th.headerlen() 1075 tindex = {} 1076 1077 # keep track of failures, cause we may succeed later 1078 failures = {} 1079 # Read the data records for this transaction 1080 while pos < tend: 1081 h = self._read_data_header(pos) 1082 if h.oid in failures: 1083 del failures[h.oid] # second chance! 1084 1085 assert base + self._tfile.tell() == here, (here, base, 1086 self._tfile.tell()) 1087 try: 1088 p, prev, ipos = self._transactionalUndoRecord( 1089 h.oid, pos, h.tid, h.prev) 1090 except UndoError as v: 1091 # Don't fail right away. We may be redeemed later! 1092 failures[h.oid] = v 1093 else: 1094 1095 if self.blob_dir and not p and prev: 1096 try: 1097 up, userial = self._loadBackTxn(h.oid, prev) 1098 except POSKeyError: 1099 pass # It was removed, so no need to copy data 1100 else: 1101 if self.is_blob_record(up): 1102 # We're undoing a blob modification operation. 1103 # We have to copy the blob data 1104 tmp = mktemp(dir=self.fshelper.temp_dir) 1105 with self.openCommittedBlobFile( 1106 h.oid, userial) as sfp: 1107 with open(tmp, 'wb') as dfp: 1108 cp(sfp, dfp) 1109 self._blob_storeblob(h.oid, self._tid, tmp) 1110 1111 new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p)) 1112 1113 # TODO: This seek shouldn't be necessary, but some other 1114 # bit of code is messing with the file pointer. 1115 assert self._tfile.tell() == here - base, (here, base, 1116 self._tfile.tell()) 1117 self._tfile.write(new.asString()) 1118 if p: 1119 self._tfile.write(p) 1120 else: 1121 self._tfile.write(p64(prev)) 1122 tindex[h.oid] = here 1123 here += new.recordlen() 1124 1125 pos += h.recordlen() 1126 if pos > tend: 1127 raise UndoError("non-undoable transaction") 1128 1129 if failures: 1130 raise MultipleUndoErrors(list(failures.items())) 1131 1132 return tindex 1133 1134 def history(self, oid, size=1, filter=None): 1135 with self._lock: 1136 r = [] 1137 pos = self._lookup_pos(oid) 1138 1139 while 1: 1140 if len(r) >= size: return r 1141 h = self._read_data_header(pos) 1142 1143 th = self._read_txn_header(h.tloc) 1144 if th.ext: 1145 d = loads(th.ext) 1146 else: 1147 d = {} 1148 1149 d.update({"time": TimeStamp(h.tid).timeTime(), 1150 "user_name": th.user, 1151 "description": th.descr, 1152 "tid": h.tid, 1153 "size": h.plen, 1154 }) 1155 1156 if filter is None or filter(d): 1157 r.append(d) 1158 1159 if h.prev: 1160 pos = h.prev 1161 else: 1162 return r 1163 1164 def _redundant_pack(self, file, pos): 1165 assert pos > 8, pos 1166 file.seek(pos - 8) 1167 p = u64(file.read(8)) 1168 file.seek(pos - p + 8) 1169 return file.read(1) not in ' u' 1170 1171 @staticmethod 1172 def packer(storage, referencesf, stop, gc): 1173 # Our default packer is built around the original packer. We 1174 # simply adapt the old interface to the new. We don't really 1175 # want to invest much in the old packer, at least for now. 1176 assert referencesf is not None 1177 p = FileStoragePacker(storage, referencesf, stop, gc) 1178 try: 1179 opos = p.pack() 1180 if opos is None: 1181 return None 1182 return opos, p.index 1183 finally: 1184 p.close() 1185 1186 def pack(self, t, referencesf, gc=None): 1187 """Copy data from the current database file to a packed file 1188 1189 Non-current records from transactions with time-stamp strings less 1190 than packtss are ommitted. As are all undone records. 1191 1192 Also, data back pointers that point before packtss are resolved and 1193 the associated data are copied, since the old records are not copied. 1194 """ 1195 if self._is_read_only: 1196 raise ReadOnlyError() 1197 1198 stop = TimeStamp(*time.gmtime(t)[:5]+(t%60,)).raw() 1199 if stop == z64: 1200 raise FileStorageError('Invalid pack time') 1201 1202 # If the storage is empty, there's nothing to do. 1203 if not self._index: 1204 return 1205 1206 with self._lock: 1207 if self._pack_is_in_progress: 1208 raise FileStorageError('Already packing') 1209 self._pack_is_in_progress = True 1210 1211 if gc is None: 1212 gc = self._pack_gc 1213 1214 oldpath = self._file_name + ".old" 1215 if os.path.exists(oldpath): 1216 os.remove(oldpath) 1217 if self.blob_dir and os.path.exists(self.blob_dir + ".old"): 1218 remove_committed_dir(self.blob_dir + ".old") 1219 1220 cleanup = [] 1221 1222 have_commit_lock = False 1223 try: 1224 pack_result = None 1225 try: 1226 pack_result = self.packer(self, referencesf, stop, gc) 1227 except RedundantPackWarning as detail: 1228 logger.info(str(detail)) 1229 if pack_result is None: 1230 return 1231 have_commit_lock = True 1232 opos, index = pack_result 1233 with self._files.write_lock(): 1234 with self._lock: 1235 self._files.empty() 1236 self._file.close() 1237 try: 1238 os.rename(self._file_name, oldpath) 1239 except Exception: 1240 self._file = open(self._file_name, 'r+b') 1241 raise 1242 1243 # OK, we're beyond the point of no return 1244 os.rename(self._file_name + '.pack', self._file_name) 1245 self._file = open(self._file_name, 'r+b') 1246 self._initIndex(index, self._tindex) 1247 self._pos = opos 1248 1249 # We're basically done. Now we need to deal with removed 1250 # blobs and removing the .old file (see further down). 1251 1252 if self.blob_dir: 1253 self._commit_lock.release() 1254 have_commit_lock = False 1255 self._remove_blob_files_tagged_for_removal_during_pack() 1256 1257 finally: 1258 if have_commit_lock: 1259 self._commit_lock.release() 1260 with self._lock: 1261 self._pack_is_in_progress = False 1262 1263 if not self.pack_keep_old: 1264 os.remove(oldpath) 1265 1266 with self._lock: 1267 self._save_index() 1268 1269 def _remove_blob_files_tagged_for_removal_during_pack(self): 1270 lblob_dir = len(self.blob_dir) 1271 fshelper = self.fshelper 1272 old = self.blob_dir+'.old' 1273 1274 # Helper to clean up dirs left empty after moving things to old 1275 def maybe_remove_empty_dir_containing(path, level=0): 1276 path = os.path.dirname(path) 1277 if len(path) <= lblob_dir or os.listdir(path): 1278 return 1279 1280 # Path points to an empty dir. There may be a race. We 1281 # might have just removed the dir for an oid (or a parent 1282 # dir) and while we're cleaning up it's parent, another 1283 # thread is adding a new entry to it. 1284 1285 # We don't have to worry about level 0, as this is just a 1286 # directory containing an object's revisions. If it is 1287 # enmpty, the object must have been garbage. 1288 1289 # If the level is 1 or higher, we need to be more 1290 # careful. We'll get the storage lock and double check 1291 # that the dir is still empty before removing it. 1292 1293 removed = False 1294 if level: 1295 self._lock.acquire() 1296 try: 1297 if not os.listdir(path): 1298 os.rmdir(path) 1299 removed = True 1300 finally: 1301 if level: 1302 self._lock.release() 1303 1304 if removed: 1305 maybe_remove_empty_dir_containing(path, level+1) 1306 1307 1308 if self.pack_keep_old: 1309 # Helpers that move oid dir or revision file to the old dir. 1310 os.mkdir(old) 1311 link_or_copy(os.path.join(self.blob_dir, '.layout'), 1312 os.path.join(old, '.layout')) 1313 def handle_file(path): 1314 newpath = old+path[lblob_dir:] 1315 dest = os.path.dirname(newpath) 1316 if not os.path.exists(dest): 1317 os.makedirs(dest) 1318 os.rename(path, newpath) 1319 handle_dir = handle_file 1320 else: 1321 # Helpers that remove an oid dir or revision file. 1322 handle_file = remove_committed 1323 handle_dir = remove_committed_dir 1324 1325 # Fist step: move or remove oids or revisions 1326 with open(os.path.join(self.blob_dir, '.removed'), 'rb') as fp: 1327 for line in fp: 1328 line = binascii.unhexlify(line.strip()) 1329 1330 if len(line) == 8: 1331 # oid is garbage, re/move dir 1332 path = fshelper.getPathForOID(line) 1333 if not os.path.exists(path): 1334 # Hm, already gone. Odd. 1335 continue 1336 handle_dir(path) 1337 maybe_remove_empty_dir_containing(path, 1) 1338 continue 1339 1340 if len(line) != 16: 1341 raise ValueError( 1342 "Bad record in ", self.blob_dir, '.removed') 1343 1344 oid, tid = line[:8], line[8:] 1345 path = fshelper.getBlobFilename(oid, tid) 1346 if not os.path.exists(path): 1347 # Hm, already gone. Odd. 1348 continue 1349 handle_file(path) 1350 assert not os.path.exists(path) 1351 maybe_remove_empty_dir_containing(path) 1352 1353 os.remove(os.path.join(self.blob_dir, '.removed')) 1354 1355 if not self.pack_keep_old: 1356 return 1357 1358 # Second step, copy remaining files. 1359 for path, dir_names, file_names in os.walk(self.blob_dir): 1360 for file_name in file_names: 1361 if not file_name.endswith('.blob'): 1362 continue 1363 file_path = os.path.join(path, file_name) 1364 dest = os.path.dirname(old+file_path[lblob_dir:]) 1365 if not os.path.exists(dest): 1366 os.makedirs(dest) 1367 link_or_copy(file_path, old+file_path[lblob_dir:]) 1368 1369 def iterator(self, start=None, stop=None): 1370 return FileIterator(self._file_name, start, stop) 1371 1372 def lastInvalidations(self, count): 1373 file = self._file 1374 seek = file.seek 1375 read = file.read 1376 with self._lock: 1377 pos = self._pos 1378 while count > 0 and pos > 4: 1379 count -= 1 1380 seek(pos-8) 1381 pos = pos - 8 - u64(read(8)) 1382 1383 seek(0) 1384 return [(trans.tid, [r.oid for r in trans]) 1385 for trans in FileIterator(self._file_name, pos=pos)] 1386 1387 def lastTid(self, oid): 1388 """Return last serialno committed for object oid. 1389 1390 If there is no serialno for this oid -- which can only occur 1391 if it is a new object -- return None. 1392 """ 1393 try: 1394 return self.getTid(oid) 1395 except KeyError: 1396 return None 1397 1398 def cleanup(self): 1399 """Remove all files created by this storage.""" 1400 for ext in '', '.old', '.tmp', '.lock', '.index', '.pack': 1401 try: 1402 os.remove(self._file_name + ext) 1403 except OSError as e: 1404 if e.errno != errno.ENOENT: 1405 raise 1406 1407 def record_iternext(self, next=None): 1408 index = self._index 1409 oid = index.minKey(next) 1410 1411 oid_as_long, = unpack(">Q", oid) 1412 next_oid = pack(">Q", oid_as_long + 1) 1413 try: 1414 next_oid = index.minKey(next_oid) 1415 except ValueError: # "empty tree" error 1416 next_oid = None 1417 1418 data, tid = load_current(self, oid) 1419 1420 return oid, tid, data, next_oid 1421 1422 ###################################################################### 1423 # The following 2 methods are for testing a ZEO extension mechanism 1424 def getExtensionMethods(self): 1425 return dict(answer_to_the_ultimate_question=None) 1426 1427 def answer_to_the_ultimate_question(self): 1428 return 42 1429 # 1430 ###################################################################### 1431 1432def shift_transactions_forward(index, tindex, file, pos, opos): 1433 """Copy transactions forward in the data file 1434 1435 This might be done as part of a recovery effort 1436 """ 1437 1438 # Cache a bunch of methods 1439 seek=file.seek 1440 read=file.read 1441 write=file.write 1442 1443 index_get=index.get 1444 1445 # Initialize, 1446 pv=z64 1447 p1=opos 1448 p2=pos 1449 offset=p2-p1 1450 1451 # Copy the data in two stages. In the packing stage, 1452 # we skip records that are non-current or that are for 1453 # unreferenced objects. We also skip undone transactions. 1454 # 1455 # After the packing stage, we copy everything but undone 1456 # transactions, however, we have to update various back pointers. 1457 # We have to have the storage lock in the second phase to keep 1458 # data from being changed while we're copying. 1459 pnv=None 1460 while 1: 1461 1462 # Read the transaction record 1463 seek(pos) 1464 h=read(TRANS_HDR_LEN) 1465 if len(h) < TRANS_HDR_LEN: break 1466 tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h) 1467 status = as_text(status) 1468 if status=='c': break # Oops. we found a checkpoint flag. 1469 tl=u64(stl) 1470 tpos=pos 1471 tend=tpos+tl 1472 1473 otpos=opos # start pos of output trans 1474 1475 thl=ul+dl+el 1476 h2=read(thl) 1477 if len(h2) != thl: 1478 raise PackError(opos) 1479 1480 # write out the transaction record 1481 seek(opos) 1482 write(h) 1483 write(h2) 1484 1485 thl=TRANS_HDR_LEN+thl 1486 pos=tpos+thl 1487 opos=otpos+thl 1488 1489 while pos < tend: 1490 # Read the data records for this transaction 1491 seek(pos) 1492 h=read(DATA_HDR_LEN) 1493 oid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h) 1494 assert not vlen 1495 plen=u64(splen) 1496 dlen=DATA_HDR_LEN+(plen or 8) 1497 1498 tindex[oid]=opos 1499 1500 if plen: p=read(plen) 1501 else: 1502 p=read(8) 1503 p=u64(p) 1504 if p >= p2: p=p-offset 1505 elif p >= p1: 1506 # Ick, we're in trouble. Let's bail 1507 # to the index and hope for the best 1508 p=index_get(oid, 0) 1509 p=p64(p) 1510 1511 # WRITE 1512 seek(opos) 1513 sprev=p64(index_get(oid, 0)) 1514 write(pack(DATA_HDR, 1515 oid, serial, sprev, p64(otpos), 0, splen)) 1516 1517 write(p) 1518 1519 opos=opos+dlen 1520 pos=pos+dlen 1521 1522 # skip the (intentionally redundant) transaction length 1523 pos=pos+8 1524 1525 if status != 'u': 1526 index.update(tindex) # Record the position 1527 1528 tindex.clear() 1529 1530 write(stl) 1531 opos=opos+8 1532 1533 return opos 1534 1535def search_back(file, pos): 1536 seek=file.seek 1537 read=file.read 1538 seek(0,2) 1539 s=p=file.tell() 1540 while p > pos: 1541 seek(p-8) 1542 l=u64(read(8)) 1543 if l <= 0: break 1544 p=p-l-8 1545 1546 return p, s 1547 1548def recover(file_name): 1549 file=open(file_name, 'r+b') 1550 index={} 1551 tindex={} 1552 1553 pos, oid, tid = read_index(file, file_name, index, tindex, recover=1) 1554 if oid is not None: 1555 print("Nothing to recover") 1556 return 1557 1558 opos=pos 1559 pos, sz = search_back(file, pos) 1560 if pos < sz: 1561 npos = shift_transactions_forward(index, tindex, file, pos, opos) 1562 1563 file.truncate(npos) 1564 1565 print("Recovered file, lost %s, ended up with %s bytes" % ( 1566 pos-opos, npos)) 1567 1568 1569 1570def read_index(file, name, index, tindex, stop=b'\377'*8, 1571 ltid=z64, start=4, maxoid=z64, recover=0, read_only=0): 1572 """Scan the file storage and update the index. 1573 1574 Returns file position, max oid, and last transaction id. It also 1575 stores index information in the three dictionary arguments. 1576 1577 Arguments: 1578 file -- a file object (the Data.fs) 1579 name -- the name of the file (presumably file.name) 1580 index -- fsIndex, oid -> data record file offset 1581 tindex -- dictionary, oid -> data record offset 1582 tindex is cleared before return 1583 1584 There are several default arguments that affect the scan or the 1585 return values. TODO: document them. 1586 1587 start -- the file position at which to start scanning for oids added 1588 beyond the ones the passed-in indices know about. The .index 1589 file caches the highest ._pos FileStorage knew about when the 1590 the .index file was last saved, and that's the intended value 1591 to pass in for start; accept the default (and pass empty 1592 indices) to recreate the index from scratch 1593 maxoid -- ignored (it meant something prior to ZODB 3.2.6; the argument 1594 still exists just so the signature of read_index() stayed the 1595 same) 1596 1597 The file position returned is the position just after the last 1598 valid transaction record. The oid returned is the maximum object 1599 id in `index`, or z64 if the index is empty. The transaction id is the 1600 tid of the last transaction, or ltid if the index is empty. 1601 """ 1602 1603 read = file.read 1604 seek = file.seek 1605 seek(0, 2) 1606 file_size = file.tell() 1607 fmt = TempFormatter(file) 1608 1609 if file_size: 1610 if file_size < start: 1611 raise FileStorageFormatError(file.name) 1612 seek(0) 1613 if read(4) != packed_version: 1614 raise FileStorageFormatError(name) 1615 else: 1616 if not read_only: 1617 file.write(packed_version) 1618 return 4, z64, ltid 1619 1620 index_get = index.get 1621 1622 pos = start 1623 seek(start) 1624 tid = b'\0' * 7 + b'\1' 1625 1626 while 1: 1627 # Read the transaction record 1628 h = read(TRANS_HDR_LEN) 1629 if not h: 1630 break 1631 if len(h) != TRANS_HDR_LEN: 1632 if not read_only: 1633 logger.warning('%s truncated at %s', name, pos) 1634 seek(pos) 1635 file.truncate() 1636 break 1637 1638 tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h) 1639 status = as_text(status) 1640 1641 if tid <= ltid: 1642 logger.warning("%s time-stamp reduction at %s", name, pos) 1643 ltid = tid 1644 1645 if pos+(tl+8) > file_size or status=='c': 1646 # Hm, the data were truncated or the checkpoint flag wasn't 1647 # cleared. They may also be corrupted, 1648 # in which case, we don't want to totally lose the data. 1649 if not read_only: 1650 logger.warning("%s truncated, possibly due to damaged" 1651 " records at %s", name, pos) 1652 _truncate(file, name, pos) 1653 break 1654 1655 if status not in ' up': 1656 logger.warning('%s has invalid status, %s, at %s', 1657 name, status, pos) 1658 1659 if tl < TRANS_HDR_LEN + ul + dl + el: 1660 # We're in trouble. Find out if this is bad data in the 1661 # middle of the file, or just a turd that Win 9x dropped 1662 # at the end when the system crashed. 1663 # Skip to the end and read what should be the transaction length 1664 # of the last transaction. 1665 seek(-8, 2) 1666 rtl = u64(read(8)) 1667 # Now check to see if the redundant transaction length is 1668 # reasonable: 1669 if file_size - rtl < pos or rtl < TRANS_HDR_LEN: 1670 logger.critical('%s has invalid transaction header at %s', 1671 name, pos) 1672 if not read_only: 1673 logger.warning( 1674 "It appears that there is invalid data at the end of " 1675 "the file, possibly due to a system crash. %s " 1676 "truncated to recover from bad data at end." % name) 1677 _truncate(file, name, pos) 1678 break 1679 else: 1680 if recover: 1681 return pos, None, None 1682 panic('%s has invalid transaction header at %s', name, pos) 1683 1684 if tid >= stop: 1685 break 1686 1687 tpos = pos 1688 tend = tpos + tl 1689 1690 if status == 'u': 1691 # Undone transaction, skip it 1692 seek(tend) 1693 h = u64(read(8)) 1694 if h != tl: 1695 if recover: 1696 return tpos, None, None 1697 panic('%s has inconsistent transaction length at %s', 1698 name, pos) 1699 pos = tend + 8 1700 continue 1701 1702 pos = tpos + TRANS_HDR_LEN + ul + dl + el 1703 while pos < tend: 1704 # Read the data records for this transaction 1705 h = fmt._read_data_header(pos) 1706 dlen = h.recordlen() 1707 tindex[h.oid] = pos 1708 1709 if pos + dlen > tend or h.tloc != tpos: 1710 if recover: 1711 return tpos, None, None 1712 panic("%s data record exceeds transaction record at %s", 1713 name, pos) 1714 1715 if index_get(h.oid, 0) != h.prev: 1716 if h.prev: 1717 if recover: 1718 return tpos, None, None 1719 logger.error("%s incorrect previous pointer at %s", 1720 name, pos) 1721 else: 1722 logger.warning("%s incorrect previous pointer at %s", 1723 name, pos) 1724 1725 pos += dlen 1726 1727 if pos != tend: 1728 if recover: 1729 return tpos, None, None 1730 panic("%s data records don't add up at %s",name,tpos) 1731 1732 # Read the (intentionally redundant) transaction length 1733 seek(pos) 1734 h = u64(read(8)) 1735 if h != tl: 1736 if recover: 1737 return tpos, None, None 1738 panic("%s redundant transaction length check failed at %s", 1739 name, pos) 1740 pos += 8 1741 1742 index.update(tindex) 1743 tindex.clear() 1744 1745 # Caution: fsIndex doesn't have an efficient __nonzero__ or __len__. 1746 # That's why we do try/except instead. fsIndex.maxKey() is fast. 1747 try: 1748 maxoid = index.maxKey() 1749 except ValueError: 1750 # The index is empty. 1751 pass # maxoid is already equal to z64 1752 1753 return pos, maxoid, ltid 1754 1755 1756def _truncate(file, name, pos): 1757 file.seek(0, 2) 1758 file_size = file.tell() 1759 try: 1760 i = 0 1761 while 1: 1762 oname='%s.tr%s' % (name, i) 1763 if os.path.exists(oname): 1764 i += 1 1765 else: 1766 logger.warning("Writing truncated data from %s to %s", 1767 name, oname) 1768 o = open(oname,'wb') 1769 file.seek(pos) 1770 cp(file, o, file_size-pos) 1771 o.close() 1772 break 1773 except: 1774 logger.exception("couldn\'t write truncated data for %s", name) 1775 raise StorageSystemError("Couldn't save truncated data") 1776 1777 file.seek(pos) 1778 file.truncate() 1779 1780 1781class FileIterator(FileStorageFormatter): 1782 """Iterate over the transactions in a FileStorage file. 1783 """ 1784 _ltid = z64 1785 _file = None 1786 1787 def __init__(self, filename, start=None, stop=None, pos=4): 1788 assert isinstance(filename, STRING_TYPES) 1789 file = open(filename, 'rb') 1790 self._file = file 1791 self._file_name = filename 1792 if file.read(4) != packed_version: 1793 raise FileStorageFormatError(file.name) 1794 file.seek(0,2) 1795 self._file_size = file.tell() 1796 if (pos < 4) or pos > self._file_size: 1797 raise ValueError("Given position is greater than the file size", 1798 pos, self._file_size) 1799 self._pos = pos 1800 assert start is None or isinstance(start, bytes) 1801 assert stop is None or isinstance(stop, bytes) 1802 self._start = start 1803 self._stop = stop 1804 if start: 1805 if self._file_size <= 4: 1806 return 1807 self._skip_to_start(start) 1808 1809 def __len__(self): 1810 # Define a bogus __len__() to make the iterator work 1811 # with code like builtin list() and tuple() in Python 2.1. 1812 # There's a lot of C code that expects a sequence to have 1813 # an __len__() but can cope with any sort of mistake in its 1814 # implementation. So just return 0. 1815 return 0 1816 1817 # This allows us to pass an iterator as the `other` argument to 1818 # copyTransactionsFrom() in BaseStorage. The advantage here is that we 1819 # can create the iterator manually, e.g. setting start and stop, and then 1820 # just let copyTransactionsFrom() do its thing. 1821 def iterator(self): 1822 return self 1823 1824 def close(self): 1825 file = self._file 1826 if file is not None: 1827 self._file = None 1828 file.close() 1829 1830 def _skip_to_start(self, start): 1831 file = self._file 1832 pos1 = self._pos 1833 file.seek(pos1) 1834 tid1 = file.read(8) # XXX bytes 1835 if len(tid1) < 8: 1836 raise CorruptedError("Couldn't read tid.") 1837 if start < tid1: 1838 pos2 = pos1 1839 tid2 = tid1 1840 file.seek(4) 1841 tid1 = file.read(8) 1842 if start <= tid1: 1843 self._pos = 4 1844 return 1845 pos1 = 4 1846 else: 1847 if start == tid1: 1848 return 1849 1850 # Try to read the last transaction. We could be unlucky and 1851 # opened the file while committing a transaction. In that 1852 # case, we'll just scan from the beginning if the file is 1853 # small enough, otherwise we'll fail. 1854 file.seek(self._file_size-8) 1855 l = u64(file.read(8)) 1856 if not (l + 12 <= self._file_size and 1857 self._read_num(self._file_size-l) == l): 1858 if self._file_size < (1<<20): 1859 return self._scan_foreward(start) 1860 raise ValueError("Can't find last transaction in large file") 1861 pos2 = self._file_size-l-8 1862 file.seek(pos2) 1863 tid2 = file.read(8) 1864 if tid2 < tid1: 1865 raise CorruptedError("Tids out of order.") 1866 if tid2 <= start: 1867 if tid2 == start: 1868 self._pos = pos2 1869 else: 1870 self._pos = self._file_size 1871 return 1872 1873 t1 = TimeStamp(tid1).timeTime() 1874 t2 = TimeStamp(tid2).timeTime() 1875 ts = TimeStamp(start).timeTime() 1876 if (ts - t1) < (t2 - ts): 1877 return self._scan_forward(pos1, start) 1878 else: 1879 return self._scan_backward(pos2, start) 1880 1881 def _scan_forward(self, pos, start): 1882 logger.debug("Scan forward %s:%s looking for %r", 1883 self._file_name, pos, start) 1884 file = self._file 1885 while 1: 1886 # Read the transaction record 1887 h = self._read_txn_header(pos) 1888 if h.tid >= start: 1889 self._pos = pos 1890 return 1891 1892 pos += h.tlen + 8 1893 1894 def _scan_backward(self, pos, start): 1895 logger.debug("Scan backward %s:%s looking for %r", 1896 self._file_name, pos, start) 1897 file = self._file 1898 seek = file.seek 1899 read = file.read 1900 while 1: 1901 pos -= 8 1902 seek(pos) 1903 tlen = u64(read(8)) 1904 pos -= tlen 1905 h = self._read_txn_header(pos) 1906 if h.tid <= start: 1907 if h.tid == start: 1908 self._pos = pos 1909 else: 1910 self._pos = pos + tlen + 8 1911 return 1912 1913 # Iterator protocol 1914 def __iter__(self): 1915 return self 1916 1917 def __next__(self): 1918 if self._file is None: 1919 raise StopIteration() 1920 1921 pos = self._pos 1922 while True: 1923 1924 # Read the transaction record 1925 try: 1926 h = self._read_txn_header(pos) 1927 except CorruptedDataError as err: 1928 # If buf is empty, we've reached EOF. 1929 if not err.buf: 1930 break 1931 raise 1932 1933 if h.tid <= self._ltid: 1934 logger.warning("%s time-stamp reduction at %s", 1935 self._file.name, pos) 1936 self._ltid = h.tid 1937 1938 if self._stop is not None and h.tid > self._stop: 1939 break 1940 1941 if h.status == "c": 1942 # Assume we've hit the last, in-progress transaction 1943 break 1944 1945 if pos + h.tlen + 8 > self._file_size: 1946 # Hm, the data were truncated or the checkpoint flag wasn't 1947 # cleared. They may also be corrupted, 1948 # in which case, we don't want to totally lose the data. 1949 logger.warning("%s truncated, possibly due to" 1950 " damaged records at %s", self._file.name, pos) 1951 break 1952 1953 if h.status not in " up": 1954 logger.warning('%s has invalid status,' 1955 ' %s, at %s', self._file.name, h.status, pos) 1956 1957 if h.tlen < h.headerlen(): 1958 # We're in trouble. Find out if this is bad data in 1959 # the middle of the file, or just a turd that Win 9x 1960 # dropped at the end when the system crashed. Skip to 1961 # the end and read what should be the transaction 1962 # length of the last transaction. 1963 self._file.seek(-8, 2) 1964 rtl = u64(self._file.read(8)) 1965 # Now check to see if the redundant transaction length is 1966 # reasonable: 1967 if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN: 1968 logger.critical("%s has invalid transaction header at %s", 1969 self._file.name, pos) 1970 logger.warning( 1971 "It appears that there is invalid data at the end of " 1972 "the file, possibly due to a system crash. %s " 1973 "truncated to recover from bad data at end." 1974 % self._file.name) 1975 break 1976 else: 1977 logger.warning("%s has invalid transaction header at %s", 1978 self._file.name, pos) 1979 break 1980 1981 tpos = pos 1982 tend = tpos + h.tlen 1983 1984 if h.status != "u": 1985 pos = tpos + h.headerlen() 1986 result = TransactionRecord(h.tid, h.status, h.user, h.descr, 1987 h.ext, pos, tend, self._file, tpos) 1988 1989 # Read the (intentionally redundant) transaction length 1990 self._file.seek(tend) 1991 rtl = u64(self._file.read(8)) 1992 if rtl != h.tlen: 1993 logger.warning("%s redundant transaction length check" 1994 " failed at %s", self._file.name, tend) 1995 break 1996 self._pos = tend + 8 1997 1998 return result 1999 2000 self.close() 2001 raise StopIteration() 2002 2003 next = __next__ 2004 2005 2006class TransactionRecord(_TransactionRecord): 2007 2008 def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos): 2009 _TransactionRecord.__init__( 2010 self, tid, status, user, desc, ext) 2011 self._pos = pos 2012 self._tend = tend 2013 self._file = file 2014 self._tpos = tpos 2015 2016 def __iter__(self): 2017 return TransactionRecordIterator(self) 2018 2019class TransactionRecordIterator(FileStorageFormatter): 2020 """Iterate over the transactions in a FileStorage file.""" 2021 2022 def __init__(self, record): 2023 self._file = record._file 2024 self._pos = record._pos 2025 self._tpos = record._tpos 2026 self._tend = record._tend 2027 2028 def __iter__(self): 2029 return self 2030 2031 def __next__(self): 2032 pos = self._pos 2033 while pos < self._tend: 2034 # Read the data records for this transaction 2035 h = self._read_data_header(pos) 2036 dlen = h.recordlen() 2037 2038 if pos + dlen > self._tend or h.tloc != self._tpos: 2039 logger.warning("%s data record exceeds transaction" 2040 " record at %s", file.name, pos) 2041 break 2042 2043 self._pos = pos + dlen 2044 prev_txn = None 2045 if h.plen: 2046 data = self._file.read(h.plen) 2047 else: 2048 if h.back == 0: 2049 # If the backpointer is 0, then this transaction 2050 # undoes the object creation. It undid the 2051 # transaction that created it. Return None 2052 # instead of a pickle to indicate this. 2053 data = None 2054 else: 2055 data, tid = self._loadBackTxn(h.oid, h.back, False) 2056 # Caution: :ooks like this only goes one link back. 2057 # Should it go to the original data like BDBFullStorage? 2058 prev_txn = self.getTxnFromData(h.oid, h.back) 2059 2060 return Record(h.oid, h.tid, data, prev_txn, pos) 2061 2062 raise StopIteration() 2063 2064 next = __next__ 2065 2066 2067class Record(_DataRecord): 2068 2069 def __init__(self, oid, tid, data, prev, pos): 2070 super(Record, self).__init__(oid, tid, data, prev) 2071 self.pos = pos 2072 2073 2074class UndoSearch(object): 2075 2076 def __init__(self, file, pos, first, last, filter=None): 2077 self.file = file 2078 self.pos = pos 2079 self.first = first 2080 self.last = last 2081 self.filter = filter 2082 # self.i is the index of the transaction we're _going_ to find 2083 # next. When it reaches self.first, we should start appending 2084 # to self.results. When it reaches self.last, we're done 2085 # (although we may finish earlier). 2086 self.i = 0 2087 self.results = [] 2088 self.stop = False 2089 2090 def finished(self): 2091 """Return True if UndoSearch has found enough records.""" 2092 # BAW: Why 39 please? This makes no sense (see also below). 2093 return self.i >= self.last or self.pos < 39 or self.stop 2094 2095 def search(self): 2096 """Search for another record.""" 2097 dict = self._readnext() 2098 if dict is not None and (self.filter is None or self.filter(dict)): 2099 if self.i >= self.first: 2100 self.results.append(dict) 2101 self.i += 1 2102 2103 def _readnext(self): 2104 """Read the next record from the storage.""" 2105 self.file.seek(self.pos - 8) 2106 self.pos -= u64(self.file.read(8)) + 8 2107 self.file.seek(self.pos) 2108 h = self.file.read(TRANS_HDR_LEN) 2109 tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h) 2110 status = as_text(status) 2111 if status == 'p': 2112 self.stop = 1 2113 return None 2114 if status != ' ': 2115 return None 2116 d = u = b'' 2117 if ul: 2118 u = self.file.read(ul) 2119 if dl: 2120 d = self.file.read(dl) 2121 e = {} 2122 if el: 2123 try: 2124 e = loads(self.file.read(el)) 2125 except: 2126 pass 2127 d = {'id': encodebytes(tid).rstrip(), 2128 'time': TimeStamp(tid).timeTime(), 2129 'user_name': u, 2130 'size': tl, 2131 'description': d} 2132 d.update(e) 2133 return d 2134 2135class FilePool(object): 2136 2137 closed = False 2138 writing = False 2139 writers = 0 2140 2141 def __init__(self, file_name): 2142 self.name = file_name 2143 self._files = [] 2144 self._out = [] 2145 self._cond = utils.Condition() 2146 2147 @contextlib.contextmanager 2148 def write_lock(self): 2149 with self._cond: 2150 self.writers += 1 2151 while self.writing or self._out: 2152 self._cond.wait() 2153 if self.closed: 2154 raise ValueError('closed') 2155 self.writing = True 2156 2157 try: 2158 yield None 2159 finally: 2160 with self._cond: 2161 self.writing = False 2162 if self.writers > 0: 2163 self.writers -= 1 2164 self._cond.notifyAll() 2165 2166 @contextlib.contextmanager 2167 def get(self): 2168 with self._cond: 2169 while self.writers: 2170 self._cond.wait() 2171 assert not self.writing 2172 if self.closed: 2173 raise ValueError('closed') 2174 2175 try: 2176 f = self._files.pop() 2177 except IndexError: 2178 f = open(self.name, 'rb') 2179 self._out.append(f) 2180 2181 try: 2182 yield f 2183 finally: 2184 self._out.remove(f) 2185 self._files.append(f) 2186 if not self._out: 2187 with self._cond: 2188 if self.writers and not self._out: 2189 self._cond.notifyAll() 2190 2191 def empty(self): 2192 while self._files: 2193 self._files.pop().close() 2194 2195 2196 def flush(self): 2197 """Empty read buffers. 2198 2199 This is required if they contain data of rolled back transactions. 2200 """ 2201 # Unfortunately, Python 3.x has no API to flush read buffers, and 2202 # the API is ineffective in Python 2 on Mac OS X. 2203 with self.write_lock(): 2204 self.empty() 2205 2206 def close(self): 2207 with self._cond: 2208 self.closed = True 2209 while self._out: 2210 self._out.pop().close() 2211 self.empty() 2212 self.writing = self.writers = 0 2213