1##############################################################################
2#
3# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4# All Rights Reserved.
5#
6# This software is subject to the provisions of the Zope Public License,
7# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11# FOR A PARTICULAR PURPOSE
12#
13##############################################################################
14"""Storage implementation using a log written to a single file.
15"""
16from __future__ import print_function
17
18import binascii
19import contextlib
20import errno
21import logging
22import os
23import time
24from struct import pack
25from struct import unpack
26
27from persistent.TimeStamp import TimeStamp
28from six import string_types as STRING_TYPES
29from zc.lockfile import LockFile
30from zope.interface import alsoProvides
31from zope.interface import implementer
32
33from .. import utils
34
35from ZODB.blob import BlobStorageMixin
36from ZODB.blob import link_or_copy
37from ZODB.blob import remove_committed
38from ZODB.blob import remove_committed_dir
39from ZODB.BaseStorage import BaseStorage
40from ZODB.BaseStorage import DataRecord as _DataRecord
41from ZODB.BaseStorage import TransactionRecord as _TransactionRecord
42from ZODB.ConflictResolution import ConflictResolvingStorage
43from ZODB.FileStorage.format import CorruptedDataError
44from ZODB.FileStorage.format import CorruptedError
45from ZODB.FileStorage.format import DATA_HDR
46from ZODB.FileStorage.format import DATA_HDR_LEN
47from ZODB.FileStorage.format import DataHeader
48from ZODB.FileStorage.format import FileStorageFormatter
49from ZODB.FileStorage.format import TRANS_HDR
50from ZODB.FileStorage.format import TRANS_HDR_LEN
51from ZODB.FileStorage.format import TxnHeader
52from ZODB.FileStorage.fspack import FileStoragePacker
53from ZODB.interfaces import IBlobStorageRestoreable
54from ZODB.interfaces import IExternalGC
55from ZODB.interfaces import IStorage
56from ZODB.interfaces import IStorageCurrentRecordIteration
57from ZODB.interfaces import IStorageIteration
58from ZODB.interfaces import IStorageRestoreable
59from ZODB.interfaces import IStorageUndoable
60from ZODB.POSException import ConflictError
61from ZODB.POSException import MultipleUndoErrors
62from ZODB.POSException import POSKeyError
63from ZODB.POSException import ReadOnlyError
64from ZODB.POSException import StorageError
65from ZODB.POSException import StorageSystemError
66from ZODB.POSException import StorageTransactionError
67from ZODB.POSException import UndoError
68from ZODB.fsIndex import fsIndex
69from ZODB.utils import as_bytes
70from ZODB.utils import as_text
71from ZODB.utils import cp
72from ZODB.utils import load_current
73from ZODB.utils import mktemp
74from ZODB.utils import p64
75from ZODB.utils import u64
76from ZODB.utils import z64
77from ZODB._compat import Pickler
78from ZODB._compat import loads
79from ZODB._compat import decodebytes
80from ZODB._compat import encodebytes
81from ZODB._compat import _protocol
82from ZODB._compat import FILESTORAGE_MAGIC
83
84
85# Not all platforms have fsync
86fsync = getattr(os, "fsync", None)
87
88packed_version = FILESTORAGE_MAGIC
89
90logger = logging.getLogger('ZODB.FileStorage')
91
92def panic(message, *data):
93    logger.critical(message, *data)
94    raise CorruptedTransactionError(message % data)
95
96class FileStorageError(StorageError):
97    pass
98
99class PackError(FileStorageError):
100    pass
101
102class FileStorageFormatError(FileStorageError):
103    """Invalid file format
104
105    The format of the given file is not valid.
106    """
107
108class CorruptedFileStorageError(FileStorageError,
109                                StorageSystemError):
110    """Corrupted file storage."""
111
112class CorruptedTransactionError(CorruptedFileStorageError):
113    pass
114
115class FileStorageQuotaError(FileStorageError,
116                            StorageSystemError):
117    """File storage quota exceeded."""
118
119# Intended to be raised only in fspack.py, and ignored here.
120class RedundantPackWarning(FileStorageError):
121    pass
122
123class TempFormatter(FileStorageFormatter):
124    """Helper class used to read formatted FileStorage data."""
125
126    def __init__(self, afile):
127        self._file = afile
128
129@implementer(
130        IStorageRestoreable,
131        IStorageIteration,
132        IStorageUndoable,
133        IStorageCurrentRecordIteration,
134        IExternalGC,
135        IStorage,
136        )
137class FileStorage(
138    FileStorageFormatter,
139    BlobStorageMixin,
140    ConflictResolvingStorage,
141    BaseStorage,
142    ):
143    """Storage that saves data in a file
144    """
145
146    # Set True while a pack is in progress; undo is blocked for the duration.
147    _pack_is_in_progress = False
148
149    def __init__(self, file_name, create=False, read_only=False, stop=None,
150                 quota=None, pack_gc=True, pack_keep_old=True, packer=None,
151                 blob_dir=None):
152        """Create a file storage
153
154        :param str file_name: Path to store data file
155        :param bool create: Flag indicating whether a file should be
156            created even if it already exists.
157        :param bool read_only: Flag indicating whether the file is
158            read only. Only one process is able to open the file
159            non-read-only.
160        :param bytes stop: Time-travel transaction id
161            When the file is opened, data will be read up to the given
162            transaction id.  Transaction ids correspond to times and
163            you can compute transaction ids for a given time using
164            :class:`~ZODB.TimeStamp.TimeStamp`.
165        :param int quota: File-size quota
166        :param bool pack_gc: Flag indicating whether garbage
167            collection should be performed when packing.
168        :param bool pack_keep_old: flag indicating whether old data
169            files should be retained after packing as a ``.old`` file.
170        :param callable packer: An alternative
171           :interface:`packer <ZODB.FileStorage.interfaces.IFileStoragePacker>`.
172        :param str blob_dir: A blob-directory path name.
173           Blobs will be supported if this option is provided.
174
175        A file storage stores data in a single file that behaves like
176        a traditional transaction log. New data records are appended
177        to the end of the file.  Periodically, the file is packed to
178        free up space.  When this is done, current records as of the
179        pack time or later are copied to a new file, which replaces
180        the old file.
181
182        FileStorages keep in-memory indexes mapping object oids to the
183        location of their current records in the file. Back pointers to
184        previous records allow access to non-current records from the
185        current records.
186
187        In addition to the data file, some ancillary files are
188        created. These can be lost without affecting data
189        integrity, however losing the index file may cause extremely
190        slow startup. Each has a name that's a concatenation of the
191        original file and a suffix. The files are listed below by
192        suffix:
193
194        .index
195           Snapshot of the in-memory index.  This are created on
196           shutdown, packing, and after rebuilding an index when one
197           was not found.  For large databases, creating a
198           file-storage object without an index file can take very
199           long because it's necessary to scan the data file to build
200           the index.
201
202        .lock
203           A lock file preventing multiple processes from opening a
204           file storage on non-read-only mode.
205
206        .tmp
207          A file used to store data being committed in the first phase
208          of 2-phase commit
209
210        .index_tmp
211          A temporary file used when saving the in-memory index to
212          avoid overwriting an existing index until a new index has
213          been fully saved.
214
215        .pack
216          A temporary file written while packing containing current
217          records as of and after the pack time.
218
219        .old
220          The previous database file after a pack.
221
222        When the database is packed, current records as of the pack
223        time and later are written to the ``.pack`` file. At the end
224        of packing, the ``.old`` file is removed, if it exists, and
225        the data file is renamed to the ``.old`` file and finally the
226        ``.pack`` file is rewritten to the data file.
227        """
228
229        if read_only:
230            self._is_read_only = True
231            if create:
232                raise ValueError("can't create a read-only file")
233        elif stop is not None:
234            raise ValueError("time-travel only supported in read-only mode")
235
236        if stop is None:
237            stop = b'\377'*8
238
239        # Lock the database and set up the temp file.
240        if not read_only:
241            # Create the lock file
242            self._lock_file = LockFile(file_name + '.lock')
243            self._tfile = open(file_name + '.tmp', 'w+b')
244            self._tfmt = TempFormatter(self._tfile)
245        else:
246            self._tfile = None
247
248        self._file_name = os.path.abspath(file_name)
249
250        self._pack_gc = pack_gc
251        self.pack_keep_old = pack_keep_old
252        if packer is not None:
253            self.packer = packer
254
255        BaseStorage.__init__(self, file_name)
256
257        index, tindex = self._newIndexes()
258        self._initIndex(index, tindex)
259
260        # Now open the file
261
262        self._file = None
263        if not create:
264            try:
265                self._file = open(file_name, read_only and 'rb' or 'r+b')
266            except IOError as exc:
267                if exc.errno == errno.EFBIG:
268                    # The file is too big to open.  Fail visibly.
269                    raise
270                if read_only:
271                    # When open request is read-only we do not want to create
272                    # the file
273                    raise
274                if exc.errno == errno.ENOENT:
275                    # The file doesn't exist.  Create it.
276                    create = 1
277                # If something else went wrong, it's hard to guess
278                # what the problem was.  If the file does not exist,
279                # create it.  Otherwise, fail.
280                if os.path.exists(file_name):
281                    raise
282                else:
283                    create = 1
284
285        if self._file is None and create:
286            if os.path.exists(file_name):
287                os.remove(file_name)
288            self._file = open(file_name, 'w+b')
289            self._file.write(packed_version)
290
291        self._files = FilePool(self._file_name)
292        r = self._restore_index()
293        if r is not None:
294            self._used_index = 1 # Marker for testing
295            index, start, ltid = r
296
297            self._initIndex(index, tindex)
298            self._pos, self._oid, tid = read_index(
299                self._file, file_name, index, tindex, stop,
300                ltid=ltid, start=start, read_only=read_only,
301                )
302        else:
303            self._used_index = 0 # Marker for testing
304            self._pos, self._oid, tid = read_index(
305                self._file, file_name, index, tindex, stop,
306                read_only=read_only,
307                )
308            self._save_index()
309
310        self._ltid = tid
311
312        # self._pos should always point just past the last
313        # transaction.  During 2PC, data is written after _pos.
314        # invariant is restored at tpc_abort() or tpc_finish().
315
316        self._ts = tid = TimeStamp(tid)
317        t = time.time()
318        t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
319        if tid > t:
320            seconds = tid.timeTime() - t.timeTime()
321            complainer = logger.warning
322            if seconds > 30 * 60:   # 30 minutes -- way screwed up
323                complainer = logger.critical
324            complainer("%s Database records %d seconds in the future",
325                       file_name, seconds)
326
327        self._quota = quota
328
329        if blob_dir:
330            self.blob_dir = os.path.abspath(blob_dir)
331            if create and os.path.exists(self.blob_dir):
332                remove_committed_dir(self.blob_dir)
333
334            self._blob_init(blob_dir)
335            alsoProvides(self, IBlobStorageRestoreable)
336        else:
337            self.blob_dir = None
338            self._blob_init_no_blobs()
339
340    def copyTransactionsFrom(self, other):
341        if self.blob_dir:
342            return BlobStorageMixin.copyTransactionsFrom(self, other)
343        else:
344            return BaseStorage.copyTransactionsFrom(self, other)
345
346    def _initIndex(self, index, tindex):
347        self._index=index
348        self._tindex=tindex
349        self._index_get=index.get
350
351    def __len__(self):
352        return len(self._index)
353
354    def _newIndexes(self):
355        # hook to use something other than builtin dict
356        return fsIndex(), {}
357
358    _saved = 0
359    def _save_index(self):
360        """Write the database index to a file to support quick startup."""
361
362        if self._is_read_only:
363            return
364
365        index_name = self.__name__ + '.index'
366        tmp_name = index_name + '.index_tmp'
367
368        self._index.save(self._pos, tmp_name)
369
370        try:
371            try:
372                os.remove(index_name)
373            except OSError:
374                pass
375            os.rename(tmp_name, index_name)
376        except: pass
377
378        self._saved += 1
379
380    def _clear_index(self):
381        index_name = self.__name__ + '.index'
382        if os.path.exists(index_name):
383            try:
384                os.remove(index_name)
385            except OSError:
386                pass
387
388    def _sane(self, index, pos):
389        """Sanity check saved index data by reading the last undone trans
390
391        Basically, we read the last not undone transaction and
392        check to see that the included records are consistent
393        with the index.  Any invalid record records or inconsistent
394        object positions cause zero to be returned.
395        """
396        r = self._check_sanity(index, pos)
397        if not r:
398            logger.warning("Ignoring index for %s", self._file_name)
399        return r
400
401    def _check_sanity(self, index, pos):
402
403        if pos < 100:
404            return 0 # insane
405        self._file.seek(0, 2)
406        if self._file.tell() < pos:
407            return 0 # insane
408        ltid = None
409
410        max_checked = 5
411        checked = 0
412
413        while checked < max_checked:
414            self._file.seek(pos - 8)
415            rstl = self._file.read(8)
416            tl = u64(rstl)
417            pos = pos - tl - 8
418            if pos < 4:
419                return 0 # insane
420            h = self._read_txn_header(pos)
421            if not ltid:
422                ltid = h.tid
423            if h.tlen != tl:
424                return 0 # inconsistent lengths
425            if h.status == 'u':
426                continue # undone trans, search back
427            if h.status not in ' p':
428                return 0 # insane
429            if tl < h.headerlen():
430                return 0 # insane
431            tend = pos + tl
432            opos = pos + h.headerlen()
433            if opos == tend:
434                continue # empty trans
435
436            while opos < tend and checked < max_checked:
437                # Read the data records for this transaction
438                h = self._read_data_header(opos)
439
440                if opos + h.recordlen() > tend or h.tloc != pos:
441                    return 0
442
443                if index.get(h.oid, 0) != opos:
444                    return 0 # insane
445
446                checked += 1
447
448                opos = opos + h.recordlen()
449
450            return ltid
451
452    def _restore_index(self):
453        """Load database index to support quick startup."""
454        # Returns (index, pos, tid), or None in case of error.
455        # The index returned is always an instance of fsIndex.  If the
456        # index cached in the file is a Python dict, it's converted to
457        # fsIndex here, and, if we're not in read-only mode, the .index
458        # file is rewritten with the converted fsIndex so we don't need to
459        # convert it again the next time.
460        file_name=self.__name__
461        index_name=file_name+'.index'
462
463        if os.path.exists(index_name):
464            try:
465                info = fsIndex.load(index_name)
466            except:
467                logger.exception('loading index')
468                return None
469        else:
470            return None
471
472        index = info.get('index')
473        pos = info.get('pos')
474        if index is None or pos is None:
475            return None
476        pos = int(pos)
477
478        if (isinstance(index, dict) or
479                (isinstance(index, fsIndex) and
480                 isinstance(index._data, dict))):
481            # Convert dictionary indexes to fsIndexes *or* convert fsIndexes
482            # which have a dict `_data` attribute to a new fsIndex (newer
483            # fsIndexes have an OOBTree as `_data`).
484            newindex = fsIndex()
485            newindex.update(index)
486            index = newindex
487            if not self._is_read_only:
488                # Save the converted index.
489                f = open(index_name, 'wb')
490                p = Pickler(f, _protocol)
491                info['index'] = index
492                p.dump(info)
493                f.close()
494                # Now call this method again to get the new data.
495                return self._restore_index()
496
497        tid = self._sane(index, pos)
498        if not tid:
499            return None
500
501        return index, pos, tid
502
503    def close(self):
504        self._file.close()
505        self._files.close()
506        if hasattr(self,'_lock_file'):
507            self._lock_file.close()
508        if self._tfile:
509            self._tfile.close()
510        try:
511            self._save_index()
512        except:
513            # Log the error and continue
514            logger.exception("Error saving index on close()")
515
516    def getSize(self):
517        return self._pos
518
519    def _lookup_pos(self, oid):
520        try:
521            return self._index[oid]
522        except KeyError:
523            raise POSKeyError(oid)
524        except TypeError:
525            raise TypeError("invalid oid %r" % (oid,))
526
527    load = load_current # Keep load for now for old clients
528
529    def load(self, oid, version=''):
530        """Return pickle data and serial number."""
531        assert not version
532
533        with self._files.get() as _file:
534            pos = self._lookup_pos(oid)
535            h = self._read_data_header(pos, oid, _file)
536            if h.plen:
537                data = _file.read(h.plen)
538                return data, h.tid
539            elif h.back:
540                # Get the data from the backpointer, but tid from
541                # current txn.
542                data = self._loadBack_impl(oid, h.back, _file=_file)[0]
543                return data, h.tid
544            else:
545                raise POSKeyError(oid)
546
547    def loadSerial(self, oid, serial):
548        with self._lock:
549            pos = self._lookup_pos(oid)
550            while 1:
551                h = self._read_data_header(pos, oid)
552                if h.tid == serial:
553                    break
554                pos = h.prev
555                if h.tid < serial or not pos:
556                    raise POSKeyError(oid)
557            if h.plen:
558                return self._file.read(h.plen)
559            else:
560                return self._loadBack_impl(oid, h.back)[0]
561
562    def loadBefore(self, oid, tid):
563        with self._files.get() as _file:
564            pos = self._lookup_pos(oid)
565            end_tid = None
566            while True:
567                h = self._read_data_header(pos, oid, _file)
568                if h.tid < tid:
569                    break
570
571                pos = h.prev
572                end_tid = h.tid
573                if not pos:
574                    return None
575
576            if h.plen:
577                return _file.read(h.plen), h.tid, end_tid
578            elif h.back:
579                data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
580                return data, h.tid, end_tid
581            else:
582                raise POSKeyError(oid)
583
584    def store(self, oid, oldserial, data, version, transaction):
585        if self._is_read_only:
586            raise ReadOnlyError()
587        if transaction is not self._transaction:
588            raise StorageTransactionError(self, transaction)
589        assert not version
590
591        with self._lock:
592            if oid > self._oid:
593                self.set_max_oid(oid)
594            old = self._index_get(oid, 0)
595            committed_tid = None
596            pnv = None
597            if old:
598                h = self._read_data_header(old, oid)
599                committed_tid = h.tid
600
601                if oldserial != committed_tid:
602                    data = self.tryToResolveConflict(oid, committed_tid,
603                                                     oldserial, data)
604                    self._resolved.append(oid)
605
606            pos = self._pos
607            here = pos + self._tfile.tell() + self._thl
608            self._tindex[oid] = here
609            new = DataHeader(oid, self._tid, old, pos, 0, len(data))
610
611            self._tfile.write(new.asString())
612            self._tfile.write(data)
613
614            # Check quota
615            if self._quota is not None and here > self._quota:
616                raise FileStorageQuotaError(
617                    "The storage quota has been exceeded.")
618
619    def deleteObject(self, oid, oldserial, transaction):
620        if self._is_read_only:
621            raise ReadOnlyError()
622        if transaction is not self._transaction:
623            raise StorageTransactionError(self, transaction)
624
625        with self._lock:
626            old = self._index_get(oid, 0)
627            if not old:
628                raise POSKeyError(oid)
629            h = self._read_data_header(old, oid)
630            committed_tid = h.tid
631
632            if oldserial != committed_tid:
633                raise ConflictError(
634                    oid=oid, serials=(committed_tid, oldserial))
635
636            pos = self._pos
637            here = pos + self._tfile.tell() + self._thl
638            self._tindex[oid] = here
639            new = DataHeader(oid, self._tid, old, pos, 0, 0)
640            self._tfile.write(new.asString())
641            self._tfile.write(z64)
642
643            # Check quota
644            if self._quota is not None and here > self._quota:
645                raise FileStorageQuotaError(
646                    "The storage quota has been exceeded.")
647
648    def _data_find(self, tpos, oid, data):
649        # Return backpointer for oid.  Must call with the lock held.
650        # This is a file offset to oid's data record if found, else 0.
651        # The data records in the transaction at tpos are searched for oid.
652        # If a data record for oid isn't found, returns 0.
653        # Else if oid's data record contains a backpointer, that
654        # backpointer is returned.
655        # Else oid's data record contains the data, and the file offset of
656        # oid's data record is returned.  This data record should contain
657        # a pickle identical to the 'data' argument.
658
659        # Unclear:  If the length of the stored data doesn't match len(data),
660        # an exception is raised.  If the lengths match but the data isn't
661        # the same, 0 is returned.  Why the discrepancy?
662        self._file.seek(tpos)
663        h = self._file.read(TRANS_HDR_LEN)
664        tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
665        status = as_text(status)
666        self._file.read(ul + dl + el)
667        tend = tpos + tl + 8
668        pos = self._file.tell()
669        while pos < tend:
670            h = self._read_data_header(pos)
671            if h.oid == oid:
672                # Make sure this looks like the right data record
673                if h.plen == 0:
674                    # This is also a backpointer.  Gotta trust it.
675                    return pos
676                if h.plen != len(data):
677                    # The expected data doesn't match what's in the
678                    # backpointer.  Something is wrong.
679                    logger.error("Mismatch between data and"
680                                 " backpointer at %d", pos)
681                    return 0
682                _data = self._file.read(h.plen)
683                if data != _data:
684                    return 0
685                return pos
686            pos += h.recordlen()
687            self._file.seek(pos)
688        return 0
689
690    def restore(self, oid, serial, data, version, prev_txn, transaction):
691        # A lot like store() but without all the consistency checks.  This
692        # should only be used when we /know/ the data is good, hence the
693        # method name.  While the signature looks like store() there are some
694        # differences:
695        #
696        # - serial is the serial number of /this/ revision, not of the
697        #   previous revision.  It is used instead of self._tid, which is
698        #   ignored.
699        #
700        # - Nothing is returned
701        #
702        # - data can be None, which indicates a George Bailey object
703        #   (i.e. one who's creation has been transactionally undone).
704        #
705        # prev_txn is a backpointer.  In the original database, it's possible
706        # that the data was actually living in a previous transaction.  This
707        # can happen for transactional undo and other operations, and is used
708        # as a space saving optimization.  Under some circumstances the
709        # prev_txn may not actually exist in the target database (i.e. self)
710        # for example, if it's been packed away.  In that case, the prev_txn
711        # should be considered just a hint, and is ignored if the transaction
712        # doesn't exist.
713        if self._is_read_only:
714            raise ReadOnlyError()
715        if transaction is not self._transaction:
716            raise StorageTransactionError(self, transaction)
717        if version:
718            raise TypeError("Versions are no-longer supported")
719
720        with self._lock:
721            if oid > self._oid:
722                self.set_max_oid(oid)
723            prev_pos = 0
724            if prev_txn is not None:
725                prev_txn_pos = self._txn_find(prev_txn, 0)
726                if prev_txn_pos:
727                    prev_pos = self._data_find(prev_txn_pos, oid, data)
728            old = self._index_get(oid, 0)
729            # Calculate the file position in the temporary file
730            here = self._pos + self._tfile.tell() + self._thl
731            # And update the temp file index
732            self._tindex[oid] = here
733            if prev_pos:
734                # If there is a valid prev_pos, don't write data.
735                data = None
736            if data is None:
737                dlen = 0
738            else:
739                dlen = len(data)
740
741            # Write the recovery data record
742            new = DataHeader(oid, serial, old, self._pos, 0, dlen)
743
744            self._tfile.write(new.asString())
745
746            # Finally, write the data or a backpointer.
747            if data is None:
748                if prev_pos:
749                    self._tfile.write(p64(prev_pos))
750                else:
751                    # Write a zero backpointer, which indicates an
752                    # un-creation transaction.
753                    self._tfile.write(z64)
754            else:
755                self._tfile.write(data)
756
757    def supportsUndo(self):
758        return 1
759
760    def _clear_temp(self):
761        self._tindex.clear()
762        if self._tfile is not None:
763            self._tfile.seek(0)
764
765    def _begin(self, tid, u, d, e):
766        self._nextpos = 0
767        self._thl = TRANS_HDR_LEN + len(u) + len(d) + len(e)
768        if self._thl > 65535:
769            # one of u, d, or e may be > 65535
770            # We have to check lengths here because struct.pack
771            # doesn't raise an exception on overflow!
772            if len(u) > 65535:
773                raise FileStorageError('user name too long')
774            if len(d) > 65535:
775                raise FileStorageError('description too long')
776            if len(e) > 65535:
777                raise FileStorageError('too much extension data')
778
779    def tpc_vote(self, transaction):
780        with self._lock:
781            if transaction is not self._transaction:
782                raise StorageTransactionError(
783                    "tpc_vote called with wrong transaction")
784            dlen = self._tfile.tell()
785            self._tfile.seek(0)
786            user, descr, ext = self._ude
787
788            self._file.seek(self._pos)
789            tl = self._thl + dlen
790
791            try:
792                h = TxnHeader(self._tid, tl, "c", len(user),
793                              len(descr), len(ext))
794                h.user = user
795                h.descr = descr
796                h.ext = ext
797                self._file.write(h.asString())
798                cp(self._tfile, self._file, dlen)
799                self._file.write(p64(tl))
800                self._file.flush()
801            except:
802                # Hm, an error occurred writing out the data. Maybe the
803                # disk is full. We don't want any turd at the end.
804                self._file.truncate(self._pos)
805                self._files.flush()
806                raise
807            self._nextpos = self._pos + (tl + 8)
808            return self._resolved
809
810    def tpc_finish(self, transaction, f=None):
811        with self._files.write_lock():
812            with self._lock:
813                if transaction is not self._transaction:
814                    raise StorageTransactionError(
815                        "tpc_finish called with wrong transaction")
816                try:
817                    tid = self._tid
818                    if f is not None:
819                        f(tid)
820                    self._finish(tid, *self._ude)
821                    self._clear_temp()
822                finally:
823                    self._ude = None
824                    self._transaction = None
825                    self._commit_lock.release()
826        return tid
827
828    def _finish(self, tid, u, d, e):
829        # Clear the checkpoint flag
830        self._file.seek(self._pos+16)
831        self._file.write(as_bytes(self._tstatus))
832        try:
833            # At this point, we may have committed the data to disk.
834            # If we fail from here, we're in bad shape.
835            self._finish_finish(tid)
836        except:
837            # Ouch.  This is bad.  Let's try to get back to where we were
838            # and then roll over and die
839            logger.critical("Failure in _finish. Closing.", exc_info=True)
840            self.close()
841            raise
842
843    def _finish_finish(self, tid):
844        # This is a separate method to allow tests to replace it with
845        # something broken. :)
846
847        self._file.flush()
848        if fsync is not None:
849            fsync(self._file.fileno())
850
851        self._pos = self._nextpos
852        self._index.update(self._tindex)
853        self._ltid = tid
854        self._blob_tpc_finish()
855
856    def _abort(self):
857        if self._nextpos:
858            self._file.truncate(self._pos)
859            self._files.flush()
860            self._nextpos=0
861            self._blob_tpc_abort()
862
863    def _undoDataInfo(self, oid, pos, tpos):
864        """Return the tid, data pointer, and data for the oid record at pos
865        """
866        if tpos:
867            itpos = tpos - self._pos - self._thl
868            pos = tpos
869            tpos = self._tfile.tell()
870            h = self._tfmt._read_data_header(itpos, oid)
871            afile = self._tfile
872        else:
873            h = self._read_data_header(pos, oid)
874            afile = self._file
875
876        if h.oid != oid:
877            raise UndoError("Invalid undo transaction id", oid)
878
879        if h.plen:
880            data = afile.read(h.plen)
881        else:
882            data = ''
883            pos = h.back
884
885        if tpos:
886            self._tfile.seek(tpos) # Restore temp file to end
887
888        return h.tid, pos, data
889
890    def getTid(self, oid):
891        with self._lock:
892            pos = self._lookup_pos(oid)
893            h = self._read_data_header(pos, oid)
894            if h.plen == 0 and h.back == 0:
895                # Undone creation
896                raise POSKeyError(oid)
897            return h.tid
898
899    def _transactionalUndoRecord(self, oid, pos, tid, pre):
900        """Get the undo information for a data record
901
902        'pos' points to the data header for 'oid' in the transaction
903        being undone.  'tid' refers to the transaction being undone.
904        'pre' is the 'prev' field of the same data header.
905
906        Return a 3-tuple consisting of a pickle, data pointer, and
907        current position.  If the pickle is true, then the data
908        pointer must be 0, but the pickle can be empty *and* the
909        pointer 0.
910        """
911
912        copy = True # Can we just copy a data pointer
913
914        # First check if it is possible to undo this record.
915        tpos = self._tindex.get(oid, 0)
916        ipos = self._index.get(oid, 0)
917        tipos = tpos or ipos
918
919        if tipos != pos:
920            # The transaction being undone isn't current because:
921            # a) A later transaction was committed ipos != pos, or
922            # b) A change was made in the current transaction. This
923            #    could only be a previous undo in a multi-undo.
924            #    (We don't allow multiple data managers with the same
925            #    storage to participate in the same transaction.)
926            assert tipos > pos
927
928            # Get current data, as identified by tipos.  We'll use
929            # it to decide if and how we can undo in this case.
930            ctid, cdataptr, current_data = self._undoDataInfo(oid, ipos, tpos)
931
932            if cdataptr != pos:
933
934                # if cdataptr was == pos, then we'd be cool, because
935                # we're dealing with the same data.
936
937                # Because they aren't equal, we have to dig deeper
938
939                # Let's see if data to be undone and current data
940                # are the same. If not, we'll have to decide whether
941                # we should try conflict resolution.
942
943                try:
944                    data_to_be_undone = self._loadBack_impl(oid, pos)[0]
945                    if not current_data:
946                        current_data = self._loadBack_impl(oid, cdataptr)[0]
947
948                    if data_to_be_undone != current_data:
949                        # OK, so the current data is different from
950                        # the data being undone.  We can't just copy:
951                        copy = False
952
953                        if not pre:
954                            # The transaction we're undoing has no
955                            # previous state to merge with, so we
956                            # can't resolve a conflict.
957                            raise UndoError(
958                                "Can't undo an add transaction followed by"
959                                " conflicting transactions.", oid)
960                except KeyError:
961                    # LoadBack gave us a key error. Bail.
962                    raise UndoError("_loadBack() failed", oid)
963
964        # Return the data that should be written in the undo record.
965        if not pre:
966            # We're undoing object addition.  We're doing this because
967            # subsequent transactions has no net effect on the state
968            # (possibly because some of them were undos).
969            return "", 0, ipos
970
971        if copy:
972            # we can just copy our previous-record pointer forward
973            return "", pre, ipos
974
975        try:
976            pre_data = self._loadBack_impl(oid, pre)[0]
977        except KeyError:
978            # couldn't find oid; what's the real explanation for this?
979            raise UndoError("_loadBack() failed for %s", oid)
980
981        try:
982            data = self.tryToResolveConflict(
983                oid, ctid, tid, pre_data, current_data)
984            return data, 0, ipos
985        except ConflictError:
986            pass
987
988        raise UndoError("Some data were modified by a later transaction", oid)
989
990    # undoLog() returns a description dict that includes an id entry.
991    # The id is opaque to the client, but contains the transaction id.
992    # The transactionalUndo() implementation does a simple linear
993    # search through the file (from the end) to find the transaction.
994
995    def undoLog(self, first=0, last=-20, filter=None):
996        if last < 0:
997            # -last is supposed to be the max # of transactions.  Convert to
998            # a positive index.  Should have x - first = -last, which
999            # means x = first - last.  This is spelled out here because
1000            # the normalization code was incorrect for years (used +1
1001            # instead -- off by 1), until ZODB 3.4.
1002            last = first - last
1003        with self._lock:
1004            if self._pack_is_in_progress:
1005                raise UndoError(
1006                    'Undo is currently disabled for database maintenance.<p>')
1007            us = UndoSearch(self._file, self._pos, first, last, filter)
1008            while not us.finished():
1009                # Hold lock for batches of 20 searches, so default search
1010                # parameters will finish without letting another thread run.
1011                for i in range(20):
1012                    if us.finished():
1013                        break
1014                    us.search()
1015                # Give another thread a chance, so that a long undoLog()
1016                # operation doesn't block all other activity.
1017                self._lock.release()
1018                self._lock.acquire()
1019            return us.results
1020
1021    def undo(self, transaction_id, transaction):
1022        """Undo a transaction, given by transaction_id.
1023
1024        Do so by writing new data that reverses the action taken by
1025        the transaction.
1026
1027        Usually, we can get by with just copying a data pointer, by
1028        writing a file position rather than a pickle. Sometimes, we
1029        may do conflict resolution, in which case we actually copy
1030        new data that results from resolution.
1031        """
1032
1033        if self._is_read_only:
1034            raise ReadOnlyError()
1035        if transaction is not self._transaction:
1036            raise StorageTransactionError(self, transaction)
1037
1038        with self._lock:
1039          # Find the right transaction to undo and call _txn_undo_write().
1040          tid = decodebytes(transaction_id + b'\n')
1041          assert len(tid) == 8
1042          tpos = self._txn_find(tid, 1)
1043          tindex = self._txn_undo_write(tpos)
1044          self._tindex.update(tindex)
1045          return self._tid, tindex.keys()
1046
1047    def _txn_find(self, tid, stop_at_pack):
1048        pos = self._pos
1049        while pos > 39:
1050            self._file.seek(pos - 8)
1051            pos = pos - u64(self._file.read(8)) - 8
1052            self._file.seek(pos)
1053            h = self._file.read(TRANS_HDR_LEN)
1054            _tid = h[:8]
1055            if _tid == tid:
1056                return pos
1057            if stop_at_pack:
1058                # check the status field of the transaction header
1059                if h[16] == b'p':
1060                    break
1061        raise UndoError("Invalid transaction id")
1062
1063    def _txn_undo_write(self, tpos):
1064        # a helper function to write the data records for transactional undo
1065
1066        otloc = self._pos
1067        here = self._pos + self._tfile.tell() + self._thl
1068        base = here - self._tfile.tell()
1069        # Let's move the file pointer back to the start of the txn record.
1070        th = self._read_txn_header(tpos)
1071        if th.status != " ":
1072            raise UndoError('non-undoable transaction')
1073        tend = tpos + th.tlen
1074        pos = tpos + th.headerlen()
1075        tindex = {}
1076
1077        # keep track of failures, cause we may succeed later
1078        failures = {}
1079        # Read the data records for this transaction
1080        while pos < tend:
1081            h = self._read_data_header(pos)
1082            if h.oid in failures:
1083                del failures[h.oid] # second chance!
1084
1085            assert base + self._tfile.tell() == here, (here, base,
1086                                                       self._tfile.tell())
1087            try:
1088                p, prev, ipos = self._transactionalUndoRecord(
1089                    h.oid, pos, h.tid, h.prev)
1090            except UndoError as v:
1091                # Don't fail right away. We may be redeemed later!
1092                failures[h.oid] = v
1093            else:
1094
1095                if self.blob_dir and not p and prev:
1096                    try:
1097                        up, userial = self._loadBackTxn(h.oid, prev)
1098                    except POSKeyError:
1099                        pass # It was removed, so no need to copy data
1100                    else:
1101                        if self.is_blob_record(up):
1102                            # We're undoing a blob modification operation.
1103                            # We have to copy the blob data
1104                            tmp = mktemp(dir=self.fshelper.temp_dir)
1105                            with self.openCommittedBlobFile(
1106                                h.oid, userial) as sfp:
1107                                with open(tmp, 'wb') as dfp:
1108                                    cp(sfp, dfp)
1109                            self._blob_storeblob(h.oid, self._tid, tmp)
1110
1111                new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p))
1112
1113                # TODO:  This seek shouldn't be necessary, but some other
1114                # bit of code is messing with the file pointer.
1115                assert self._tfile.tell() == here - base, (here, base,
1116                                                           self._tfile.tell())
1117                self._tfile.write(new.asString())
1118                if p:
1119                    self._tfile.write(p)
1120                else:
1121                    self._tfile.write(p64(prev))
1122                tindex[h.oid] = here
1123                here += new.recordlen()
1124
1125            pos += h.recordlen()
1126            if pos > tend:
1127                raise UndoError("non-undoable transaction")
1128
1129        if failures:
1130            raise MultipleUndoErrors(list(failures.items()))
1131
1132        return tindex
1133
1134    def history(self, oid, size=1, filter=None):
1135        with self._lock:
1136            r = []
1137            pos = self._lookup_pos(oid)
1138
1139            while 1:
1140                if len(r) >= size: return r
1141                h = self._read_data_header(pos)
1142
1143                th = self._read_txn_header(h.tloc)
1144                if th.ext:
1145                    d = loads(th.ext)
1146                else:
1147                    d = {}
1148
1149                d.update({"time": TimeStamp(h.tid).timeTime(),
1150                          "user_name": th.user,
1151                          "description": th.descr,
1152                          "tid": h.tid,
1153                          "size": h.plen,
1154                          })
1155
1156                if filter is None or filter(d):
1157                    r.append(d)
1158
1159                if h.prev:
1160                    pos = h.prev
1161                else:
1162                    return r
1163
1164    def _redundant_pack(self, file, pos):
1165        assert pos > 8, pos
1166        file.seek(pos - 8)
1167        p = u64(file.read(8))
1168        file.seek(pos - p + 8)
1169        return file.read(1) not in ' u'
1170
1171    @staticmethod
1172    def packer(storage, referencesf, stop, gc):
1173        # Our default packer is built around the original packer.  We
1174        # simply adapt the old interface to the new.  We don't really
1175        # want to invest much in the old packer, at least for now.
1176        assert referencesf is not None
1177        p = FileStoragePacker(storage, referencesf, stop, gc)
1178        try:
1179            opos = p.pack()
1180            if opos is None:
1181                return None
1182            return opos, p.index
1183        finally:
1184            p.close()
1185
1186    def pack(self, t, referencesf, gc=None):
1187        """Copy data from the current database file to a packed file
1188
1189        Non-current records from transactions with time-stamp strings less
1190        than packtss are ommitted. As are all undone records.
1191
1192        Also, data back pointers that point before packtss are resolved and
1193        the associated data are copied, since the old records are not copied.
1194        """
1195        if self._is_read_only:
1196            raise ReadOnlyError()
1197
1198        stop = TimeStamp(*time.gmtime(t)[:5]+(t%60,)).raw()
1199        if stop == z64:
1200            raise FileStorageError('Invalid pack time')
1201
1202        # If the storage is empty, there's nothing to do.
1203        if not self._index:
1204            return
1205
1206        with self._lock:
1207            if self._pack_is_in_progress:
1208                raise FileStorageError('Already packing')
1209            self._pack_is_in_progress = True
1210
1211        if gc is None:
1212            gc = self._pack_gc
1213
1214        oldpath = self._file_name + ".old"
1215        if os.path.exists(oldpath):
1216            os.remove(oldpath)
1217        if self.blob_dir and os.path.exists(self.blob_dir + ".old"):
1218            remove_committed_dir(self.blob_dir + ".old")
1219
1220        cleanup = []
1221
1222        have_commit_lock = False
1223        try:
1224            pack_result = None
1225            try:
1226                pack_result = self.packer(self, referencesf, stop, gc)
1227            except RedundantPackWarning as detail:
1228                logger.info(str(detail))
1229            if pack_result is None:
1230                return
1231            have_commit_lock = True
1232            opos, index = pack_result
1233            with self._files.write_lock():
1234                with self._lock:
1235                    self._files.empty()
1236                    self._file.close()
1237                    try:
1238                        os.rename(self._file_name, oldpath)
1239                    except Exception:
1240                        self._file = open(self._file_name, 'r+b')
1241                        raise
1242
1243                    # OK, we're beyond the point of no return
1244                    os.rename(self._file_name + '.pack', self._file_name)
1245                    self._file = open(self._file_name, 'r+b')
1246                    self._initIndex(index, self._tindex)
1247                    self._pos = opos
1248
1249            # We're basically done.  Now we need to deal with removed
1250            # blobs and removing the .old file (see further down).
1251
1252            if self.blob_dir:
1253                self._commit_lock.release()
1254                have_commit_lock = False
1255                self._remove_blob_files_tagged_for_removal_during_pack()
1256
1257        finally:
1258            if have_commit_lock:
1259                self._commit_lock.release()
1260            with self._lock:
1261                self._pack_is_in_progress = False
1262
1263        if not self.pack_keep_old:
1264            os.remove(oldpath)
1265
1266        with self._lock:
1267            self._save_index()
1268
1269    def _remove_blob_files_tagged_for_removal_during_pack(self):
1270        lblob_dir = len(self.blob_dir)
1271        fshelper = self.fshelper
1272        old = self.blob_dir+'.old'
1273
1274        # Helper to clean up dirs left empty after moving things to old
1275        def maybe_remove_empty_dir_containing(path, level=0):
1276            path = os.path.dirname(path)
1277            if len(path) <= lblob_dir or os.listdir(path):
1278                return
1279
1280            # Path points to an empty dir.  There may be a race.  We
1281            # might have just removed the dir for an oid (or a parent
1282            # dir) and while we're cleaning up it's parent, another
1283            # thread is adding a new entry to it.
1284
1285            # We don't have to worry about level 0, as this is just a
1286            # directory containing an object's revisions. If it is
1287            # enmpty, the object must have been garbage.
1288
1289            # If the level is 1 or higher, we need to be more
1290            # careful.  We'll get the storage lock and double check
1291            # that the dir is still empty before removing it.
1292
1293            removed = False
1294            if level:
1295                self._lock.acquire()
1296            try:
1297                if not os.listdir(path):
1298                    os.rmdir(path)
1299                    removed = True
1300            finally:
1301                if level:
1302                    self._lock.release()
1303
1304            if removed:
1305                maybe_remove_empty_dir_containing(path, level+1)
1306
1307
1308        if self.pack_keep_old:
1309            # Helpers that move oid dir or revision file to the old dir.
1310            os.mkdir(old)
1311            link_or_copy(os.path.join(self.blob_dir, '.layout'),
1312                         os.path.join(old, '.layout'))
1313            def handle_file(path):
1314                newpath = old+path[lblob_dir:]
1315                dest = os.path.dirname(newpath)
1316                if not os.path.exists(dest):
1317                    os.makedirs(dest)
1318                os.rename(path, newpath)
1319            handle_dir = handle_file
1320        else:
1321            # Helpers that remove an oid dir or revision file.
1322            handle_file = remove_committed
1323            handle_dir = remove_committed_dir
1324
1325        # Fist step: move or remove oids or revisions
1326        with open(os.path.join(self.blob_dir, '.removed'), 'rb') as fp:
1327            for line in fp:
1328                line = binascii.unhexlify(line.strip())
1329
1330                if len(line) == 8:
1331                    # oid is garbage, re/move dir
1332                    path = fshelper.getPathForOID(line)
1333                    if not os.path.exists(path):
1334                        # Hm, already gone. Odd.
1335                        continue
1336                    handle_dir(path)
1337                    maybe_remove_empty_dir_containing(path, 1)
1338                    continue
1339
1340                if len(line) != 16:
1341                    raise ValueError(
1342                        "Bad record in ", self.blob_dir, '.removed')
1343
1344                oid, tid = line[:8], line[8:]
1345                path = fshelper.getBlobFilename(oid, tid)
1346                if not os.path.exists(path):
1347                    # Hm, already gone. Odd.
1348                    continue
1349                handle_file(path)
1350                assert not os.path.exists(path)
1351                maybe_remove_empty_dir_containing(path)
1352
1353        os.remove(os.path.join(self.blob_dir, '.removed'))
1354
1355        if not self.pack_keep_old:
1356            return
1357
1358        # Second step, copy remaining files.
1359        for path, dir_names, file_names in os.walk(self.blob_dir):
1360            for file_name in file_names:
1361                if not file_name.endswith('.blob'):
1362                    continue
1363                file_path = os.path.join(path, file_name)
1364                dest = os.path.dirname(old+file_path[lblob_dir:])
1365                if not os.path.exists(dest):
1366                    os.makedirs(dest)
1367                link_or_copy(file_path, old+file_path[lblob_dir:])
1368
1369    def iterator(self, start=None, stop=None):
1370        return FileIterator(self._file_name, start, stop)
1371
1372    def lastInvalidations(self, count):
1373        file = self._file
1374        seek = file.seek
1375        read = file.read
1376        with self._lock:
1377            pos = self._pos
1378            while count > 0 and pos > 4:
1379                count -= 1
1380                seek(pos-8)
1381                pos = pos - 8 - u64(read(8))
1382
1383            seek(0)
1384            return [(trans.tid, [r.oid for r in trans])
1385                    for trans in FileIterator(self._file_name, pos=pos)]
1386
1387    def lastTid(self, oid):
1388        """Return last serialno committed for object oid.
1389
1390        If there is no serialno for this oid -- which can only occur
1391        if it is a new object -- return None.
1392        """
1393        try:
1394            return self.getTid(oid)
1395        except KeyError:
1396            return None
1397
1398    def cleanup(self):
1399        """Remove all files created by this storage."""
1400        for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
1401            try:
1402                os.remove(self._file_name + ext)
1403            except OSError as e:
1404                if e.errno != errno.ENOENT:
1405                    raise
1406
1407    def record_iternext(self, next=None):
1408        index = self._index
1409        oid = index.minKey(next)
1410
1411        oid_as_long, = unpack(">Q", oid)
1412        next_oid = pack(">Q", oid_as_long + 1)
1413        try:
1414            next_oid = index.minKey(next_oid)
1415        except ValueError: # "empty tree" error
1416            next_oid = None
1417
1418        data, tid = load_current(self, oid)
1419
1420        return oid, tid, data, next_oid
1421
1422    ######################################################################
1423    # The following 2 methods are for testing a ZEO extension mechanism
1424    def getExtensionMethods(self):
1425        return dict(answer_to_the_ultimate_question=None)
1426
1427    def answer_to_the_ultimate_question(self):
1428        return 42
1429    #
1430    ######################################################################
1431
1432def shift_transactions_forward(index, tindex, file, pos, opos):
1433    """Copy transactions forward in the data file
1434
1435    This might be done as part of a recovery effort
1436    """
1437
1438    # Cache a bunch of methods
1439    seek=file.seek
1440    read=file.read
1441    write=file.write
1442
1443    index_get=index.get
1444
1445    # Initialize,
1446    pv=z64
1447    p1=opos
1448    p2=pos
1449    offset=p2-p1
1450
1451    # Copy the data in two stages.  In the packing stage,
1452    # we skip records that are non-current or that are for
1453    # unreferenced objects. We also skip undone transactions.
1454    #
1455    # After the packing stage, we copy everything but undone
1456    # transactions, however, we have to update various back pointers.
1457    # We have to have the storage lock in the second phase to keep
1458    # data from being changed while we're copying.
1459    pnv=None
1460    while 1:
1461
1462        # Read the transaction record
1463        seek(pos)
1464        h=read(TRANS_HDR_LEN)
1465        if len(h) < TRANS_HDR_LEN: break
1466        tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
1467        status = as_text(status)
1468        if status=='c': break # Oops. we found a checkpoint flag.
1469        tl=u64(stl)
1470        tpos=pos
1471        tend=tpos+tl
1472
1473        otpos=opos # start pos of output trans
1474
1475        thl=ul+dl+el
1476        h2=read(thl)
1477        if len(h2) != thl:
1478            raise PackError(opos)
1479
1480        # write out the transaction record
1481        seek(opos)
1482        write(h)
1483        write(h2)
1484
1485        thl=TRANS_HDR_LEN+thl
1486        pos=tpos+thl
1487        opos=otpos+thl
1488
1489        while pos < tend:
1490            # Read the data records for this transaction
1491            seek(pos)
1492            h=read(DATA_HDR_LEN)
1493            oid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
1494            assert not vlen
1495            plen=u64(splen)
1496            dlen=DATA_HDR_LEN+(plen or 8)
1497
1498            tindex[oid]=opos
1499
1500            if plen: p=read(plen)
1501            else:
1502                p=read(8)
1503                p=u64(p)
1504                if p >= p2: p=p-offset
1505                elif p >= p1:
1506                    # Ick, we're in trouble. Let's bail
1507                    # to the index and hope for the best
1508                    p=index_get(oid, 0)
1509                p=p64(p)
1510
1511            # WRITE
1512            seek(opos)
1513            sprev=p64(index_get(oid, 0))
1514            write(pack(DATA_HDR,
1515                       oid, serial, sprev, p64(otpos), 0, splen))
1516
1517            write(p)
1518
1519            opos=opos+dlen
1520            pos=pos+dlen
1521
1522        # skip the (intentionally redundant) transaction length
1523        pos=pos+8
1524
1525        if status != 'u':
1526            index.update(tindex) # Record the position
1527
1528        tindex.clear()
1529
1530        write(stl)
1531        opos=opos+8
1532
1533    return opos
1534
1535def search_back(file, pos):
1536    seek=file.seek
1537    read=file.read
1538    seek(0,2)
1539    s=p=file.tell()
1540    while p > pos:
1541        seek(p-8)
1542        l=u64(read(8))
1543        if l <= 0: break
1544        p=p-l-8
1545
1546    return p, s
1547
1548def recover(file_name):
1549    file=open(file_name, 'r+b')
1550    index={}
1551    tindex={}
1552
1553    pos, oid, tid = read_index(file, file_name, index, tindex, recover=1)
1554    if oid is not None:
1555        print("Nothing to recover")
1556        return
1557
1558    opos=pos
1559    pos, sz = search_back(file, pos)
1560    if pos < sz:
1561        npos = shift_transactions_forward(index, tindex, file, pos, opos)
1562
1563    file.truncate(npos)
1564
1565    print("Recovered file, lost %s, ended up with %s bytes" % (
1566        pos-opos, npos))
1567
1568
1569
1570def read_index(file, name, index, tindex, stop=b'\377'*8,
1571               ltid=z64, start=4, maxoid=z64, recover=0, read_only=0):
1572    """Scan the file storage and update the index.
1573
1574    Returns file position, max oid, and last transaction id.  It also
1575    stores index information in the three dictionary arguments.
1576
1577    Arguments:
1578    file -- a file object (the Data.fs)
1579    name -- the name of the file (presumably file.name)
1580    index -- fsIndex, oid -> data record file offset
1581    tindex -- dictionary, oid -> data record offset
1582              tindex is cleared before return
1583
1584    There are several default arguments that affect the scan or the
1585    return values.  TODO:  document them.
1586
1587    start -- the file position at which to start scanning for oids added
1588             beyond the ones the passed-in indices know about.  The .index
1589             file caches the highest ._pos FileStorage knew about when the
1590             the .index file was last saved, and that's the intended value
1591             to pass in for start; accept the default (and pass empty
1592             indices) to recreate the index from scratch
1593    maxoid -- ignored (it meant something prior to ZODB 3.2.6; the argument
1594              still exists just so the signature of read_index() stayed the
1595              same)
1596
1597    The file position returned is the position just after the last
1598    valid transaction record.  The oid returned is the maximum object
1599    id in `index`, or z64 if the index is empty.  The transaction id is the
1600    tid of the last transaction, or ltid if the index is empty.
1601    """
1602
1603    read = file.read
1604    seek = file.seek
1605    seek(0, 2)
1606    file_size = file.tell()
1607    fmt = TempFormatter(file)
1608
1609    if file_size:
1610        if file_size < start:
1611            raise FileStorageFormatError(file.name)
1612        seek(0)
1613        if read(4) != packed_version:
1614            raise FileStorageFormatError(name)
1615    else:
1616        if not read_only:
1617            file.write(packed_version)
1618        return 4, z64, ltid
1619
1620    index_get = index.get
1621
1622    pos = start
1623    seek(start)
1624    tid = b'\0' * 7 + b'\1'
1625
1626    while 1:
1627        # Read the transaction record
1628        h = read(TRANS_HDR_LEN)
1629        if not h:
1630            break
1631        if len(h) != TRANS_HDR_LEN:
1632            if not read_only:
1633                logger.warning('%s truncated at %s', name, pos)
1634                seek(pos)
1635                file.truncate()
1636            break
1637
1638        tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
1639        status = as_text(status)
1640
1641        if tid <= ltid:
1642            logger.warning("%s time-stamp reduction at %s", name, pos)
1643        ltid = tid
1644
1645        if pos+(tl+8) > file_size or status=='c':
1646            # Hm, the data were truncated or the checkpoint flag wasn't
1647            # cleared.  They may also be corrupted,
1648            # in which case, we don't want to totally lose the data.
1649            if not read_only:
1650                logger.warning("%s truncated, possibly due to damaged"
1651                               " records at %s", name, pos)
1652                _truncate(file, name, pos)
1653            break
1654
1655        if status not in ' up':
1656            logger.warning('%s has invalid status, %s, at %s',
1657                           name, status, pos)
1658
1659        if tl < TRANS_HDR_LEN + ul + dl + el:
1660            # We're in trouble. Find out if this is bad data in the
1661            # middle of the file, or just a turd that Win 9x dropped
1662            # at the end when the system crashed.
1663            # Skip to the end and read what should be the transaction length
1664            # of the last transaction.
1665            seek(-8, 2)
1666            rtl = u64(read(8))
1667            # Now check to see if the redundant transaction length is
1668            # reasonable:
1669            if file_size - rtl < pos or rtl < TRANS_HDR_LEN:
1670                logger.critical('%s has invalid transaction header at %s',
1671                                name, pos)
1672                if not read_only:
1673                    logger.warning(
1674                         "It appears that there is invalid data at the end of "
1675                         "the file, possibly due to a system crash.  %s "
1676                         "truncated to recover from bad data at end." % name)
1677                    _truncate(file, name, pos)
1678                break
1679            else:
1680                if recover:
1681                    return pos, None, None
1682                panic('%s has invalid transaction header at %s', name, pos)
1683
1684        if tid >= stop:
1685            break
1686
1687        tpos = pos
1688        tend = tpos + tl
1689
1690        if status == 'u':
1691            # Undone transaction, skip it
1692            seek(tend)
1693            h = u64(read(8))
1694            if h != tl:
1695                if recover:
1696                    return tpos, None, None
1697                panic('%s has inconsistent transaction length at %s',
1698                      name, pos)
1699            pos = tend + 8
1700            continue
1701
1702        pos = tpos + TRANS_HDR_LEN + ul + dl + el
1703        while pos < tend:
1704            # Read the data records for this transaction
1705            h = fmt._read_data_header(pos)
1706            dlen = h.recordlen()
1707            tindex[h.oid] = pos
1708
1709            if pos + dlen > tend or h.tloc != tpos:
1710                if recover:
1711                    return tpos, None, None
1712                panic("%s data record exceeds transaction record at %s",
1713                      name, pos)
1714
1715            if index_get(h.oid, 0) != h.prev:
1716                if h.prev:
1717                    if recover:
1718                        return tpos, None, None
1719                    logger.error("%s incorrect previous pointer at %s",
1720                                 name, pos)
1721                else:
1722                    logger.warning("%s incorrect previous pointer at %s",
1723                                   name, pos)
1724
1725            pos += dlen
1726
1727        if pos != tend:
1728            if recover:
1729                return tpos, None, None
1730            panic("%s data records don't add up at %s",name,tpos)
1731
1732        # Read the (intentionally redundant) transaction length
1733        seek(pos)
1734        h = u64(read(8))
1735        if h != tl:
1736            if recover:
1737                return tpos, None, None
1738            panic("%s redundant transaction length check failed at %s",
1739                  name, pos)
1740        pos += 8
1741
1742        index.update(tindex)
1743        tindex.clear()
1744
1745    # Caution:  fsIndex doesn't have an efficient __nonzero__ or __len__.
1746    # That's why we do try/except instead.  fsIndex.maxKey() is fast.
1747    try:
1748        maxoid = index.maxKey()
1749    except ValueError:
1750        # The index is empty.
1751        pass # maxoid is already equal to z64
1752
1753    return pos, maxoid, ltid
1754
1755
1756def _truncate(file, name, pos):
1757    file.seek(0, 2)
1758    file_size = file.tell()
1759    try:
1760        i = 0
1761        while 1:
1762            oname='%s.tr%s' % (name, i)
1763            if os.path.exists(oname):
1764                i += 1
1765            else:
1766                logger.warning("Writing truncated data from %s to %s",
1767                               name, oname)
1768                o = open(oname,'wb')
1769                file.seek(pos)
1770                cp(file, o, file_size-pos)
1771                o.close()
1772                break
1773    except:
1774        logger.exception("couldn\'t write truncated data for %s", name)
1775        raise StorageSystemError("Couldn't save truncated data")
1776
1777    file.seek(pos)
1778    file.truncate()
1779
1780
1781class FileIterator(FileStorageFormatter):
1782    """Iterate over the transactions in a FileStorage file.
1783    """
1784    _ltid = z64
1785    _file = None
1786
1787    def __init__(self, filename, start=None, stop=None, pos=4):
1788        assert isinstance(filename, STRING_TYPES)
1789        file = open(filename, 'rb')
1790        self._file = file
1791        self._file_name = filename
1792        if file.read(4) != packed_version:
1793            raise FileStorageFormatError(file.name)
1794        file.seek(0,2)
1795        self._file_size = file.tell()
1796        if (pos < 4) or pos > self._file_size:
1797            raise ValueError("Given position is greater than the file size",
1798                             pos, self._file_size)
1799        self._pos = pos
1800        assert start is None or isinstance(start, bytes)
1801        assert stop is None or isinstance(stop, bytes)
1802        self._start = start
1803        self._stop = stop
1804        if start:
1805            if self._file_size <= 4:
1806                return
1807            self._skip_to_start(start)
1808
1809    def __len__(self):
1810        # Define a bogus __len__() to make the iterator work
1811        # with code like builtin list() and tuple() in Python 2.1.
1812        # There's a lot of C code that expects a sequence to have
1813        # an __len__() but can cope with any sort of mistake in its
1814        # implementation.  So just return 0.
1815        return 0
1816
1817    # This allows us to pass an iterator as the `other` argument to
1818    # copyTransactionsFrom() in BaseStorage.  The advantage here is that we
1819    # can create the iterator manually, e.g. setting start and stop, and then
1820    # just let copyTransactionsFrom() do its thing.
1821    def iterator(self):
1822        return self
1823
1824    def close(self):
1825        file = self._file
1826        if file is not None:
1827            self._file = None
1828            file.close()
1829
1830    def _skip_to_start(self, start):
1831        file = self._file
1832        pos1 = self._pos
1833        file.seek(pos1)
1834        tid1 = file.read(8) # XXX bytes
1835        if len(tid1) < 8:
1836            raise CorruptedError("Couldn't read tid.")
1837        if start < tid1:
1838            pos2 = pos1
1839            tid2 = tid1
1840            file.seek(4)
1841            tid1 = file.read(8)
1842            if start <= tid1:
1843                self._pos = 4
1844                return
1845            pos1 = 4
1846        else:
1847            if start == tid1:
1848                return
1849
1850            # Try to read the last transaction. We could be unlucky and
1851            # opened the file while committing a transaction.  In that
1852            # case, we'll just scan from the beginning if the file is
1853            # small enough, otherwise we'll fail.
1854            file.seek(self._file_size-8)
1855            l = u64(file.read(8))
1856            if not (l + 12 <= self._file_size and
1857                    self._read_num(self._file_size-l) == l):
1858                if self._file_size < (1<<20):
1859                    return self._scan_foreward(start)
1860                raise ValueError("Can't find last transaction in large file")
1861            pos2 = self._file_size-l-8
1862            file.seek(pos2)
1863            tid2 = file.read(8)
1864            if tid2 < tid1:
1865                raise CorruptedError("Tids out of order.")
1866            if tid2 <= start:
1867                if tid2 == start:
1868                    self._pos = pos2
1869                else:
1870                    self._pos = self._file_size
1871                return
1872
1873        t1 = TimeStamp(tid1).timeTime()
1874        t2 = TimeStamp(tid2).timeTime()
1875        ts = TimeStamp(start).timeTime()
1876        if (ts - t1) < (t2 - ts):
1877            return self._scan_forward(pos1, start)
1878        else:
1879            return self._scan_backward(pos2, start)
1880
1881    def _scan_forward(self, pos, start):
1882        logger.debug("Scan forward %s:%s looking for %r",
1883                     self._file_name, pos, start)
1884        file = self._file
1885        while 1:
1886            # Read the transaction record
1887            h = self._read_txn_header(pos)
1888            if h.tid >= start:
1889                self._pos = pos
1890                return
1891
1892            pos += h.tlen + 8
1893
1894    def _scan_backward(self, pos, start):
1895        logger.debug("Scan backward %s:%s looking for %r",
1896                     self._file_name, pos, start)
1897        file = self._file
1898        seek = file.seek
1899        read = file.read
1900        while 1:
1901            pos -= 8
1902            seek(pos)
1903            tlen = u64(read(8))
1904            pos -= tlen
1905            h = self._read_txn_header(pos)
1906            if h.tid <= start:
1907                if h.tid == start:
1908                    self._pos = pos
1909                else:
1910                    self._pos = pos + tlen + 8
1911                return
1912
1913    # Iterator protocol
1914    def __iter__(self):
1915        return self
1916
1917    def __next__(self):
1918        if self._file is None:
1919            raise StopIteration()
1920
1921        pos = self._pos
1922        while True:
1923
1924            # Read the transaction record
1925            try:
1926                h = self._read_txn_header(pos)
1927            except CorruptedDataError as err:
1928                # If buf is empty, we've reached EOF.
1929                if not err.buf:
1930                    break
1931                raise
1932
1933            if h.tid <= self._ltid:
1934                logger.warning("%s time-stamp reduction at %s",
1935                               self._file.name, pos)
1936            self._ltid = h.tid
1937
1938            if self._stop is not None and h.tid > self._stop:
1939                break
1940
1941            if h.status == "c":
1942                # Assume we've hit the last, in-progress transaction
1943                break
1944
1945            if pos + h.tlen + 8 > self._file_size:
1946                # Hm, the data were truncated or the checkpoint flag wasn't
1947                # cleared.  They may also be corrupted,
1948                # in which case, we don't want to totally lose the data.
1949                logger.warning("%s truncated, possibly due to"
1950                               " damaged records at %s", self._file.name, pos)
1951                break
1952
1953            if h.status not in " up":
1954                logger.warning('%s has invalid status,'
1955                               ' %s, at %s', self._file.name, h.status, pos)
1956
1957            if h.tlen < h.headerlen():
1958                # We're in trouble. Find out if this is bad data in
1959                # the middle of the file, or just a turd that Win 9x
1960                # dropped at the end when the system crashed.  Skip to
1961                # the end and read what should be the transaction
1962                # length of the last transaction.
1963                self._file.seek(-8, 2)
1964                rtl = u64(self._file.read(8))
1965                # Now check to see if the redundant transaction length is
1966                # reasonable:
1967                if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN:
1968                    logger.critical("%s has invalid transaction header at %s",
1969                                    self._file.name, pos)
1970                    logger.warning(
1971                         "It appears that there is invalid data at the end of "
1972                         "the file, possibly due to a system crash.  %s "
1973                         "truncated to recover from bad data at end."
1974                         % self._file.name)
1975                    break
1976                else:
1977                    logger.warning("%s has invalid transaction header at %s",
1978                                   self._file.name, pos)
1979                    break
1980
1981            tpos = pos
1982            tend = tpos + h.tlen
1983
1984            if h.status != "u":
1985                pos = tpos + h.headerlen()
1986                result = TransactionRecord(h.tid, h.status, h.user, h.descr,
1987                                           h.ext, pos, tend, self._file, tpos)
1988
1989            # Read the (intentionally redundant) transaction length
1990            self._file.seek(tend)
1991            rtl = u64(self._file.read(8))
1992            if rtl != h.tlen:
1993                logger.warning("%s redundant transaction length check"
1994                               " failed at %s", self._file.name, tend)
1995                break
1996            self._pos = tend + 8
1997
1998            return result
1999
2000        self.close()
2001        raise StopIteration()
2002
2003    next = __next__
2004
2005
2006class TransactionRecord(_TransactionRecord):
2007
2008    def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
2009        _TransactionRecord.__init__(
2010            self, tid, status, user, desc, ext)
2011        self._pos = pos
2012        self._tend = tend
2013        self._file = file
2014        self._tpos = tpos
2015
2016    def __iter__(self):
2017        return TransactionRecordIterator(self)
2018
2019class TransactionRecordIterator(FileStorageFormatter):
2020    """Iterate over the transactions in a FileStorage file."""
2021
2022    def __init__(self, record):
2023        self._file = record._file
2024        self._pos = record._pos
2025        self._tpos = record._tpos
2026        self._tend = record._tend
2027
2028    def __iter__(self):
2029        return self
2030
2031    def __next__(self):
2032        pos = self._pos
2033        while pos < self._tend:
2034            # Read the data records for this transaction
2035            h = self._read_data_header(pos)
2036            dlen = h.recordlen()
2037
2038            if pos + dlen > self._tend or h.tloc != self._tpos:
2039                logger.warning("%s data record exceeds transaction"
2040                               " record at %s", file.name, pos)
2041                break
2042
2043            self._pos = pos + dlen
2044            prev_txn = None
2045            if h.plen:
2046                data = self._file.read(h.plen)
2047            else:
2048                if h.back == 0:
2049                    # If the backpointer is 0, then this transaction
2050                    # undoes the object creation.  It undid the
2051                    # transaction that created it.  Return None
2052                    # instead of a pickle to indicate this.
2053                    data = None
2054                else:
2055                    data, tid = self._loadBackTxn(h.oid, h.back, False)
2056                    # Caution:  :ooks like this only goes one link back.
2057                    # Should it go to the original data like BDBFullStorage?
2058                    prev_txn = self.getTxnFromData(h.oid, h.back)
2059
2060            return Record(h.oid, h.tid, data, prev_txn, pos)
2061
2062        raise StopIteration()
2063
2064    next = __next__
2065
2066
2067class Record(_DataRecord):
2068
2069    def __init__(self, oid, tid, data, prev, pos):
2070        super(Record, self).__init__(oid, tid, data, prev)
2071        self.pos = pos
2072
2073
2074class UndoSearch(object):
2075
2076    def __init__(self, file, pos, first, last, filter=None):
2077        self.file = file
2078        self.pos = pos
2079        self.first = first
2080        self.last = last
2081        self.filter = filter
2082        # self.i is the index of the transaction we're _going_ to find
2083        # next.  When it reaches self.first, we should start appending
2084        # to self.results.  When it reaches self.last, we're done
2085        # (although we may finish earlier).
2086        self.i = 0
2087        self.results = []
2088        self.stop = False
2089
2090    def finished(self):
2091        """Return True if UndoSearch has found enough records."""
2092        # BAW: Why 39 please?  This makes no sense (see also below).
2093        return self.i >= self.last or self.pos < 39 or self.stop
2094
2095    def search(self):
2096        """Search for another record."""
2097        dict = self._readnext()
2098        if dict is not None and (self.filter is None or self.filter(dict)):
2099            if self.i >= self.first:
2100                self.results.append(dict)
2101            self.i += 1
2102
2103    def _readnext(self):
2104        """Read the next record from the storage."""
2105        self.file.seek(self.pos - 8)
2106        self.pos -= u64(self.file.read(8)) + 8
2107        self.file.seek(self.pos)
2108        h = self.file.read(TRANS_HDR_LEN)
2109        tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
2110        status = as_text(status)
2111        if status == 'p':
2112            self.stop = 1
2113            return None
2114        if status != ' ':
2115            return None
2116        d = u = b''
2117        if ul:
2118            u = self.file.read(ul)
2119        if dl:
2120            d = self.file.read(dl)
2121        e = {}
2122        if el:
2123            try:
2124                e = loads(self.file.read(el))
2125            except:
2126                pass
2127        d = {'id': encodebytes(tid).rstrip(),
2128             'time': TimeStamp(tid).timeTime(),
2129             'user_name': u,
2130             'size': tl,
2131             'description': d}
2132        d.update(e)
2133        return d
2134
2135class FilePool(object):
2136
2137    closed = False
2138    writing = False
2139    writers = 0
2140
2141    def __init__(self, file_name):
2142        self.name = file_name
2143        self._files = []
2144        self._out = []
2145        self._cond = utils.Condition()
2146
2147    @contextlib.contextmanager
2148    def write_lock(self):
2149        with self._cond:
2150            self.writers += 1
2151            while self.writing or self._out:
2152                self._cond.wait()
2153            if self.closed:
2154                raise ValueError('closed')
2155            self.writing = True
2156
2157        try:
2158            yield None
2159        finally:
2160            with self._cond:
2161                self.writing = False
2162                if self.writers > 0:
2163                    self.writers -= 1
2164                self._cond.notifyAll()
2165
2166    @contextlib.contextmanager
2167    def get(self):
2168        with self._cond:
2169            while self.writers:
2170                self._cond.wait()
2171            assert not self.writing
2172            if self.closed:
2173                raise ValueError('closed')
2174
2175            try:
2176                f = self._files.pop()
2177            except IndexError:
2178                f = open(self.name, 'rb')
2179            self._out.append(f)
2180
2181        try:
2182            yield f
2183        finally:
2184            self._out.remove(f)
2185            self._files.append(f)
2186            if not self._out:
2187                with self._cond:
2188                    if self.writers and not self._out:
2189                        self._cond.notifyAll()
2190
2191    def empty(self):
2192        while self._files:
2193            self._files.pop().close()
2194
2195
2196    def flush(self):
2197        """Empty read buffers.
2198
2199        This is required if they contain data of rolled back transactions.
2200        """
2201        # Unfortunately, Python 3.x has no API to flush read buffers, and
2202        # the API is ineffective in Python 2 on Mac OS X.
2203        with self.write_lock():
2204            self.empty()
2205
2206    def close(self):
2207        with self._cond:
2208            self.closed = True
2209            while self._out:
2210                self._out.pop().close()
2211            self.empty()
2212            self.writing = self.writers = 0
2213