1##############################################################################
2#
3# Copyright (c) 2003 Zope Foundation and Contributors.
4# All Rights Reserved.
5#
6# This software is subject to the provisions of the Zope Public License,
7# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11# FOR A PARTICULAR PURPOSE.
12#
13##############################################################################
14"""FileStorage helper to perform pack.
15
16A storage contains an ordered set of object revisions.  When a storage
17is packed, object revisions that are not reachable as of the pack time
18are deleted.  The notion of reachability is complicated by
19backpointers -- object revisions that point to earlier revisions of
20the same object.
21
22An object revisions is reachable at a certain time if it is reachable
23from the revision of the root at that time or if it is reachable from
24a backpointer after that time.
25"""
26
27from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
28from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
29from ZODB.utils import p64, u64, z64
30
31import binascii
32import logging
33import os
34import ZODB.fsIndex
35import ZODB.POSException
36
37logger = logging.getLogger(__name__)
38
39class PackError(ZODB.POSException.POSError):
40    pass
41
42class PackCopier(FileStorageFormatter):
43
44    def __init__(self, f, index, tindex):
45        self._file = f
46        self._index = index
47        self._tindex = tindex
48        self._pos = None
49
50    def _txn_find(self, tid, stop_at_pack):
51        # _pos always points just past the last transaction
52        pos = self._pos
53        while pos > 4:
54            self._file.seek(pos - 8)
55            pos = pos - u64(self._file.read(8)) - 8
56            self._file.seek(pos)
57            h = self._file.read(TRANS_HDR_LEN) # XXX bytes
58            _tid = h[:8]
59            if _tid == tid:
60                return pos
61            if stop_at_pack:
62                if h[16] == 'p':
63                    break
64        raise PackError("Invalid backpointer transaction id")
65
66    def _data_find(self, tpos, oid, data):
67        # Return backpointer for oid.  Must call with the lock held.
68        # This is a file offset to oid's data record if found, else 0.
69        # The data records in the transaction at tpos are searched for oid.
70        # If a data record for oid isn't found, returns 0.
71        # Else if oid's data record contains a backpointer, that
72        # backpointer is returned.
73        # Else oid's data record contains the data, and the file offset of
74        # oid's data record is returned.  This data record should contain
75        # a pickle identical to the 'data' argument.
76
77        # Unclear:  If the length of the stored data doesn't match len(data),
78        # an exception is raised.  If the lengths match but the data isn't
79        # the same, 0 is returned.  Why the discrepancy?
80        h = self._read_txn_header(tpos)
81        tend = tpos + h.tlen
82        pos = self._file.tell()
83        while pos < tend:
84            h = self._read_data_header(pos)
85            if h.oid == oid:
86                # Make sure this looks like the right data record
87                if h.plen == 0:
88                    # This is also a backpointer.  Gotta trust it.
89                    return pos
90                if h.plen != len(data):
91                    # The expected data doesn't match what's in the
92                    # backpointer.  Something is wrong.
93                    logger.error("Mismatch between data and backpointer at %d",
94                                 pos)
95                    return 0
96                _data = self._file.read(h.plen)
97                if data != _data:
98                    return 0
99                return pos
100            pos += h.recordlen()
101        return 0
102
103    def copy(self, oid, serial, data, prev_txn, txnpos, datapos):
104        prev_pos = self._resolve_backpointer(prev_txn, oid, data)
105        old = self._index.get(oid, 0)
106        # Calculate the pos the record will have in the storage.
107        here = datapos
108        # And update the temp file index
109        self._tindex[oid] = here
110        if prev_pos:
111            # If there is a valid prev_pos, don't write data.
112            data = None
113        if data is None:
114            dlen = 0
115        else:
116            dlen = len(data)
117        # Write the recovery data record
118        h = DataHeader(oid, serial, old, txnpos, 0, dlen)
119
120        self._file.write(h.asString())
121        # Write the data or a backpointer
122        if data is None:
123            if prev_pos:
124                self._file.write(p64(prev_pos))
125            else:
126                # Write a zero backpointer, which indicates an
127                # un-creation transaction.
128                self._file.write(z64)
129        else:
130            self._file.write(data)
131
132    def setTxnPos(self, pos):
133        self._pos = pos
134
135    def _resolve_backpointer(self, prev_txn, oid, data):
136        pos = self._file.tell()
137        try:
138            prev_pos = 0
139            if prev_txn is not None:
140                prev_txn_pos = self._txn_find(prev_txn, 0)
141                if prev_txn_pos:
142                    prev_pos = self._data_find(prev_txn_pos, oid, data)
143            return prev_pos
144        finally:
145            self._file.seek(pos)
146
147class GC(FileStorageFormatter):
148
149    def __init__(self, file, eof, packtime, gc, referencesf):
150        self._file = file
151        self._name = file.name
152        self.eof = eof
153        self.packtime = packtime
154        self.gc = gc
155        # packpos: position of first txn header after pack time
156        self.packpos = None
157
158        # {oid -> current data record position}:
159        self.oid2curpos = ZODB.fsIndex.fsIndex()
160
161        # The set of reachable revisions of each object.
162        #
163        # This set as managed using two data structures.  The first is
164        # an fsIndex mapping oids to one data record pos.  Since only
165        # a few objects will have more than one revision, we use this
166        # efficient data structure to handle the common case.  The
167        # second is a dictionary mapping objects to lists of
168        # positions; it is used to handle the same number of objects
169        # for which we must keep multiple revisions.
170        self.reachable = ZODB.fsIndex.fsIndex()
171        self.reach_ex = {}
172
173        # keep ltid for consistency checks during initial scan
174        self.ltid = z64
175
176        self.referencesf = referencesf
177
178    def isReachable(self, oid, pos):
179        """Return 1 if revision of `oid` at `pos` is reachable."""
180
181        rpos = self.reachable.get(oid)
182        if rpos is None:
183            return 0
184        if rpos == pos:
185            return 1
186        return pos in self.reach_ex.get(oid, [])
187
188    def findReachable(self):
189        self.buildPackIndex()
190        if self.gc:
191            self.findReachableAtPacktime([z64])
192            self.findReachableFromFuture()
193            # These mappings are no longer needed and may consume a lot of
194            # space.
195            del self.oid2curpos
196        else:
197            self.reachable = self.oid2curpos
198
199    def buildPackIndex(self):
200        pos = 4
201        # We make the initial assumption that the database has been
202        # packed before and set unpacked to True only after seeing the
203        # first record with a status == " ".  If we get to the packtime
204        # and unpacked is still False, we need to watch for a redundant
205        # pack.
206        unpacked = False
207        while pos < self.eof:
208            th = self._read_txn_header(pos)
209            if th.tid > self.packtime:
210                break
211            self.checkTxn(th, pos)
212            if th.status != "p":
213                unpacked = True
214
215            tpos = pos
216            end = pos + th.tlen
217            pos += th.headerlen()
218
219            while pos < end:
220                dh = self._read_data_header(pos)
221                self.checkData(th, tpos, dh, pos)
222                if dh.plen or dh.back:
223                    self.oid2curpos[dh.oid] = pos
224                else:
225                    if dh.oid in self.oid2curpos:
226                        del self.oid2curpos[dh.oid]
227                pos += dh.recordlen()
228
229            tlen = self._read_num(pos)
230            if tlen != th.tlen:
231                self.fail(pos, "redundant transaction length does not "
232                          "match initial transaction length: %d != %d",
233                          tlen, th.tlen)
234            pos += 8
235
236        self.packpos = pos
237
238        if unpacked:
239            return
240        # check for a redundant pack.  If the first record following
241        # the newly computed packpos has status 'p', then it was
242        # packed earlier and the current pack is redudant.
243        try:
244            th = self._read_txn_header(pos)
245        except CorruptedDataError as err:
246            if err.buf != b"":
247                raise
248        if th.status == 'p':
249            # Delayed import to cope with circular imports.
250            # TODO:  put exceptions in a separate module.
251            from ZODB.FileStorage.FileStorage import RedundantPackWarning
252            raise RedundantPackWarning(
253                "The database has already been packed to a later time"
254                " or no changes have been made since the last pack")
255
256    def findReachableAtPacktime(self, roots):
257        """Mark all objects reachable from the oids in roots as reachable."""
258        reachable = self.reachable
259        oid2curpos = self.oid2curpos
260
261        todo = list(roots)
262        while todo:
263            oid = todo.pop()
264            if oid in reachable:
265                continue
266
267            try:
268                pos = oid2curpos[oid]
269            except KeyError:
270                if oid == z64 and len(oid2curpos) == 0:
271                    # special case, pack to before creation time
272                    continue
273                raise KeyError(oid)
274
275            reachable[oid] = pos
276            for oid in self.findrefs(pos):
277                if oid not in reachable:
278                    todo.append(oid)
279
280    def findReachableFromFuture(self):
281        # In this pass, the roots are positions of object revisions.
282        # We add a pos to extra_roots when there is a backpointer to a
283        # revision that was not current at the packtime.  The
284        # non-current revision could refer to objects that were
285        # otherwise unreachable at the packtime.
286        extra_roots = []
287
288        pos = self.packpos
289        while pos < self.eof:
290            th = self._read_txn_header(pos)
291            self.checkTxn(th, pos)
292            tpos = pos
293            end = pos + th.tlen
294            pos += th.headerlen()
295
296            while pos < end:
297                dh = self._read_data_header(pos)
298                self.checkData(th, tpos, dh, pos)
299
300                if dh.back and dh.back < self.packpos:
301                    if dh.oid in self.reachable:
302                        L = self.reach_ex.setdefault(dh.oid, [])
303                        if dh.back not in L:
304                            L.append(dh.back)
305                            extra_roots.append(dh.back)
306                    else:
307                        self.reachable[dh.oid] = dh.back
308
309                pos += dh.recordlen()
310
311            tlen = self._read_num(pos)
312            if tlen != th.tlen:
313                self.fail(pos, "redundant transaction length does not "
314                          "match initial transaction length: %d != %d",
315                          tlen, th.tlen)
316            pos += 8
317
318        for pos in extra_roots:
319            refs = self.findrefs(pos)
320            self.findReachableAtPacktime(refs)
321
322    def findrefs(self, pos):
323        """Return a list of oids referenced as of packtime."""
324        dh = self._read_data_header(pos)
325        # Chase backpointers until we get to the record with the refs
326        while dh.back:
327            dh = self._read_data_header(dh.back)
328        if dh.plen:
329            return self.referencesf(self._file.read(dh.plen))
330        else:
331            return []
332
333class FileStoragePacker(FileStorageFormatter):
334
335    # path is the storage file path.
336    # stop is the pack time, as a TimeStamp.
337    # current_size is the storage's _pos.  All valid data at the start
338    # lives before that offset (there may be a checkpoint transaction in
339    # progress after it).
340
341    def __init__(self, storage, referencesf, stop, gc=True):
342        self._storage = storage
343        if storage.blob_dir:
344            self.pack_blobs = True
345            self.blob_removed = open(
346                os.path.join(storage.blob_dir, '.removed'), 'wb')
347        else:
348            self.pack_blobs = False
349            self.blob_removed = None
350
351        path = storage._file.name
352        self._name = path
353        # We open our own handle on the storage so that much of pack can
354        # proceed in parallel.  It's important to close this file at every
355        # return point, else on Windows the caller won't be able to rename
356        # or remove the storage file.
357        self._file = open(path, "rb")
358        self._path = path
359        self._stop = stop
360        self.locked = False
361        self.file_end = storage.getSize()
362
363        self.gc = GC(self._file, self.file_end, self._stop, gc, referencesf)
364
365        # The packer needs to acquire the parent's commit lock
366        # during the copying stage, so the two sets of lock acquire
367        # and release methods are passed to the constructor.
368        self._lock = storage._lock
369        self._commit_lock = storage._commit_lock
370
371        # The packer will use several indexes.
372        # index: oid -> pos
373        # tindex: oid -> pos, for current txn
374        # oid2tid: not used by the packer
375
376        self.index = ZODB.fsIndex.fsIndex()
377        self.tindex = {}
378        self.oid2tid = {}
379        self.toid2tid = {}
380        self.toid2tid_delete = {}
381
382        self._tfile = None
383
384    def close(self):
385        self._file.close()
386        if self._tfile is not None:
387            self._tfile.close()
388        if self.blob_removed is not None:
389            self.blob_removed.close()
390
391    def pack(self):
392        # Pack copies all data reachable at the pack time or later.
393        #
394        # Copying occurs in two phases.  In the first phase, txns
395        # before the pack time are copied if the contain any reachable
396        # data.  In the second phase, all txns after the pack time
397        # are copied.
398        #
399        # Txn and data records contain pointers to previous records.
400        # Because these pointers are stored as file offsets, they
401        # must be updated when we copy data.
402
403        # TODO:  Should add sanity checking to pack.
404
405        self.gc.findReachable()
406
407        def close_files_remove():
408            # blank except: we might be in an IOError situation/handler
409            # try our best, but don't fail
410            try:
411                self._tfile.close()
412            except:
413                pass
414            try:
415                self._file.close()
416            except:
417                pass
418            try:
419                os.remove(self._name + ".pack")
420            except:
421                pass
422            if self.blob_removed is not None:
423                self.blob_removed.close()
424
425        # Setup the destination file and copy the metadata.
426        # TODO:  rename from _tfile to something clearer.
427        self._tfile = open(self._name + ".pack", "w+b")
428        try:
429            self._file.seek(0)
430            self._tfile.write(self._file.read(self._metadata_size))
431
432            self._copier = PackCopier(self._tfile, self.index, self.tindex)
433
434            ipos, opos = self.copyToPacktime()
435        except (OSError, IOError):
436            # most probably ran out of disk space or some other IO error
437            close_files_remove()
438            raise  # don't succeed silently
439
440        assert ipos == self.gc.packpos
441        if ipos == opos:
442            # pack didn't free any data.  there's no point in continuing.
443            close_files_remove()
444            return None
445        self._commit_lock.acquire()
446        self.locked = True
447        try:
448            with self._lock:
449                # Re-open the file in unbuffered mode.
450
451                # The main thread may write new transactions to the
452                # file, which creates the possibility that we will
453                # read a status 'c' transaction into the pack thread's
454                # stdio buffer even though we're acquiring the commit
455                # lock.  Transactions can still be in progress
456                # throughout much of packing, and are written to the
457                # same physical file but via a distinct Python file
458                # object.  The code used to leave off the trailing 0
459                # argument, and then on every platform except native
460                # Windows it was observed that we could read stale
461                # data from the tail end of the file.
462                self._file.close() # else self.gc keeps the original
463                                   # alive & open
464                self._file = open(self._path, "rb", 0)
465                self._file.seek(0, 2)
466                self.file_end = self._file.tell()
467
468            if ipos < self.file_end:
469                self.copyRest(ipos)
470
471            # OK, we've copied everything. Now we need to wrap things up.
472            pos = self._tfile.tell()
473            self._tfile.flush()
474            self._tfile.close()
475            self._file.close()
476            if self.blob_removed is not None:
477                self.blob_removed.close()
478
479            return pos
480        except (OSError, IOError):
481            # most probably ran out of disk space or some other IO error
482            close_files_remove()
483            if self.locked:
484                self._commit_lock.release()
485            raise  # don't succeed silently
486        except:
487            if self.locked:
488                self._commit_lock.release()
489            raise
490
491    def copyToPacktime(self):
492        offset = 0  # the amount of space freed by packing
493        pos = self._metadata_size
494        new_pos = pos
495
496        while pos < self.gc.packpos:
497            th = self._read_txn_header(pos)
498            new_tpos, pos = self.copyDataRecords(pos, th)
499
500            if new_tpos:
501                new_pos = self._tfile.tell() + 8
502                tlen = new_pos - new_tpos - 8
503                # Update the transaction length
504                self._tfile.seek(new_tpos + 8)
505                self._tfile.write(p64(tlen))
506                self._tfile.seek(new_pos - 8)
507                self._tfile.write(p64(tlen))
508
509
510            tlen = self._read_num(pos)
511            if tlen != th.tlen:
512                self.fail(pos, "redundant transaction length does not "
513                          "match initial transaction length: %d != %d",
514                          tlen, th.tlen)
515            pos += 8
516
517        return pos, new_pos
518
519    def copyDataRecords(self, pos, th):
520        """Copy any current data records between pos and tend.
521
522        Returns position of txn header in output file and position
523        of next record in the input file.
524
525        If any data records are copied, also write txn header (th).
526        """
527        copy = 0
528        new_tpos = 0
529        tend = pos + th.tlen
530        pos += th.headerlen()
531        while pos < tend:
532            h = self._read_data_header(pos)
533            if not self.gc.isReachable(h.oid, pos):
534                if self.pack_blobs:
535                    # We need to find out if this is a blob, so get the data:
536                    if h.plen:
537                        data = self._file.read(h.plen)
538                    else:
539                        data = self.fetchDataViaBackpointer(h.oid, h.back)
540                    if data and self._storage.is_blob_record(data):
541                        # We need to remove the blob record. Maybe we
542                        # need to remove oid:
543
544                        # But first, we need to make sure the record
545                        # we're looking at isn't a dup of the current
546                        # record. There's a bug in ZEO blob support that causes
547                        # duplicate data records.
548                        rpos = self.gc.reachable.get(h.oid)
549                        is_dup = (rpos
550                                  and self._read_data_header(rpos).tid == h.tid)
551                        if not is_dup:
552                            if h.oid not in self.gc.reachable:
553                                self.blob_removed.write(
554                                    binascii.hexlify(h.oid)+b'\n')
555                            else:
556                                self.blob_removed.write(
557                                    binascii.hexlify(h.oid+h.tid)+b'\n')
558
559                pos += h.recordlen()
560                continue
561
562            pos += h.recordlen()
563
564            # If we are going to copy any data, we need to copy
565            # the transaction header.  Note that we will need to
566            # patch up the transaction length when we are done.
567            if not copy:
568                th.status = "p"
569                s = th.asString()
570                new_tpos = self._tfile.tell()
571                self._tfile.write(s)
572                new_pos = new_tpos + len(s)
573                copy = 1
574
575            if h.plen:
576                data = self._file.read(h.plen)
577            else:
578                data = self.fetchDataViaBackpointer(h.oid, h.back)
579
580            self.writePackedDataRecord(h, data, new_tpos)
581            new_pos = self._tfile.tell()
582
583        return new_tpos, pos
584
585    def fetchDataViaBackpointer(self, oid, back):
586        """Return the data for oid via backpointer back
587
588        If `back` is 0 or ultimately resolves to 0, return None.
589        In this case, the transaction undoes the object
590        creation.
591        """
592        if back == 0:
593            return None
594        data, tid = self._loadBackTxn(oid, back, 0)
595        return data
596
597    def writePackedDataRecord(self, h, data, new_tpos):
598        # Update the header to reflect current information, then write
599        # it to the output file.
600        if data is None:
601            data = b''
602        h.prev = 0
603        h.back = 0
604        h.plen = len(data)
605        h.tloc = new_tpos
606        pos = self._tfile.tell()
607        self.index[h.oid] = pos
608        self._tfile.write(h.asString())
609        self._tfile.write(data)
610        if not data:
611            # Packed records never have backpointers (?).
612            # If there is no data, write a z64 backpointer.
613            # This is a George Bailey event.
614            self._tfile.write(z64)
615
616    def copyRest(self, ipos):
617        # After the pack time, all data records are copied.
618        # Copy one txn at a time, using copy() for data.
619
620        try:
621            while 1:
622                ipos = self.copyOne(ipos)
623        except CorruptedDataError as err:
624            # The last call to copyOne() will raise
625            # CorruptedDataError, because it will attempt to read past
626            # the end of the file.  Double-check that the exception
627            # occurred for this reason.
628            self._file.seek(0, 2)
629            endpos = self._file.tell()
630            if endpos != err.pos:
631                raise
632
633    def copyOne(self, ipos):
634        # The call below will raise CorruptedDataError at EOF.
635        th = self._read_txn_header(ipos)
636        # Release commit lock while writing to pack file
637        self._commit_lock.release()
638        self.locked = False
639        pos = self._tfile.tell()
640        self._copier.setTxnPos(pos)
641        self._tfile.write(th.asString())
642        tend = ipos + th.tlen
643        ipos += th.headerlen()
644
645        while ipos < tend:
646            h = self._read_data_header(ipos)
647            ipos += h.recordlen()
648            prev_txn = None
649            if h.plen:
650                data = self._file.read(h.plen)
651            else:
652                data = self.fetchDataViaBackpointer(h.oid, h.back)
653                if h.back:
654                    prev_txn = self.getTxnFromData(h.oid, h.back)
655
656            self._copier.copy(h.oid, h.tid, data, prev_txn,
657                              pos, self._tfile.tell())
658
659        tlen = self._tfile.tell() - pos
660        assert tlen == th.tlen
661        self._tfile.write(p64(tlen))
662        ipos += 8
663
664        self.index.update(self.tindex)
665        self.tindex.clear()
666        self._commit_lock.acquire()
667        self.locked = True
668        return ipos
669