1############################################################################## 2# 3# Copyright (c) 2003 Zope Foundation and Contributors. 4# All Rights Reserved. 5# 6# This software is subject to the provisions of the Zope Public License, 7# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. 8# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED 9# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 10# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 11# FOR A PARTICULAR PURPOSE. 12# 13############################################################################## 14"""FileStorage helper to perform pack. 15 16A storage contains an ordered set of object revisions. When a storage 17is packed, object revisions that are not reachable as of the pack time 18are deleted. The notion of reachability is complicated by 19backpointers -- object revisions that point to earlier revisions of 20the same object. 21 22An object revisions is reachable at a certain time if it is reachable 23from the revision of the root at that time or if it is reachable from 24a backpointer after that time. 25""" 26 27from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN 28from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError 29from ZODB.utils import p64, u64, z64 30 31import binascii 32import logging 33import os 34import ZODB.fsIndex 35import ZODB.POSException 36 37logger = logging.getLogger(__name__) 38 39class PackError(ZODB.POSException.POSError): 40 pass 41 42class PackCopier(FileStorageFormatter): 43 44 def __init__(self, f, index, tindex): 45 self._file = f 46 self._index = index 47 self._tindex = tindex 48 self._pos = None 49 50 def _txn_find(self, tid, stop_at_pack): 51 # _pos always points just past the last transaction 52 pos = self._pos 53 while pos > 4: 54 self._file.seek(pos - 8) 55 pos = pos - u64(self._file.read(8)) - 8 56 self._file.seek(pos) 57 h = self._file.read(TRANS_HDR_LEN) # XXX bytes 58 _tid = h[:8] 59 if _tid == tid: 60 return pos 61 if stop_at_pack: 62 if h[16] == 'p': 63 break 64 raise PackError("Invalid backpointer transaction id") 65 66 def _data_find(self, tpos, oid, data): 67 # Return backpointer for oid. Must call with the lock held. 68 # This is a file offset to oid's data record if found, else 0. 69 # The data records in the transaction at tpos are searched for oid. 70 # If a data record for oid isn't found, returns 0. 71 # Else if oid's data record contains a backpointer, that 72 # backpointer is returned. 73 # Else oid's data record contains the data, and the file offset of 74 # oid's data record is returned. This data record should contain 75 # a pickle identical to the 'data' argument. 76 77 # Unclear: If the length of the stored data doesn't match len(data), 78 # an exception is raised. If the lengths match but the data isn't 79 # the same, 0 is returned. Why the discrepancy? 80 h = self._read_txn_header(tpos) 81 tend = tpos + h.tlen 82 pos = self._file.tell() 83 while pos < tend: 84 h = self._read_data_header(pos) 85 if h.oid == oid: 86 # Make sure this looks like the right data record 87 if h.plen == 0: 88 # This is also a backpointer. Gotta trust it. 89 return pos 90 if h.plen != len(data): 91 # The expected data doesn't match what's in the 92 # backpointer. Something is wrong. 93 logger.error("Mismatch between data and backpointer at %d", 94 pos) 95 return 0 96 _data = self._file.read(h.plen) 97 if data != _data: 98 return 0 99 return pos 100 pos += h.recordlen() 101 return 0 102 103 def copy(self, oid, serial, data, prev_txn, txnpos, datapos): 104 prev_pos = self._resolve_backpointer(prev_txn, oid, data) 105 old = self._index.get(oid, 0) 106 # Calculate the pos the record will have in the storage. 107 here = datapos 108 # And update the temp file index 109 self._tindex[oid] = here 110 if prev_pos: 111 # If there is a valid prev_pos, don't write data. 112 data = None 113 if data is None: 114 dlen = 0 115 else: 116 dlen = len(data) 117 # Write the recovery data record 118 h = DataHeader(oid, serial, old, txnpos, 0, dlen) 119 120 self._file.write(h.asString()) 121 # Write the data or a backpointer 122 if data is None: 123 if prev_pos: 124 self._file.write(p64(prev_pos)) 125 else: 126 # Write a zero backpointer, which indicates an 127 # un-creation transaction. 128 self._file.write(z64) 129 else: 130 self._file.write(data) 131 132 def setTxnPos(self, pos): 133 self._pos = pos 134 135 def _resolve_backpointer(self, prev_txn, oid, data): 136 pos = self._file.tell() 137 try: 138 prev_pos = 0 139 if prev_txn is not None: 140 prev_txn_pos = self._txn_find(prev_txn, 0) 141 if prev_txn_pos: 142 prev_pos = self._data_find(prev_txn_pos, oid, data) 143 return prev_pos 144 finally: 145 self._file.seek(pos) 146 147class GC(FileStorageFormatter): 148 149 def __init__(self, file, eof, packtime, gc, referencesf): 150 self._file = file 151 self._name = file.name 152 self.eof = eof 153 self.packtime = packtime 154 self.gc = gc 155 # packpos: position of first txn header after pack time 156 self.packpos = None 157 158 # {oid -> current data record position}: 159 self.oid2curpos = ZODB.fsIndex.fsIndex() 160 161 # The set of reachable revisions of each object. 162 # 163 # This set as managed using two data structures. The first is 164 # an fsIndex mapping oids to one data record pos. Since only 165 # a few objects will have more than one revision, we use this 166 # efficient data structure to handle the common case. The 167 # second is a dictionary mapping objects to lists of 168 # positions; it is used to handle the same number of objects 169 # for which we must keep multiple revisions. 170 self.reachable = ZODB.fsIndex.fsIndex() 171 self.reach_ex = {} 172 173 # keep ltid for consistency checks during initial scan 174 self.ltid = z64 175 176 self.referencesf = referencesf 177 178 def isReachable(self, oid, pos): 179 """Return 1 if revision of `oid` at `pos` is reachable.""" 180 181 rpos = self.reachable.get(oid) 182 if rpos is None: 183 return 0 184 if rpos == pos: 185 return 1 186 return pos in self.reach_ex.get(oid, []) 187 188 def findReachable(self): 189 self.buildPackIndex() 190 if self.gc: 191 self.findReachableAtPacktime([z64]) 192 self.findReachableFromFuture() 193 # These mappings are no longer needed and may consume a lot of 194 # space. 195 del self.oid2curpos 196 else: 197 self.reachable = self.oid2curpos 198 199 def buildPackIndex(self): 200 pos = 4 201 # We make the initial assumption that the database has been 202 # packed before and set unpacked to True only after seeing the 203 # first record with a status == " ". If we get to the packtime 204 # and unpacked is still False, we need to watch for a redundant 205 # pack. 206 unpacked = False 207 while pos < self.eof: 208 th = self._read_txn_header(pos) 209 if th.tid > self.packtime: 210 break 211 self.checkTxn(th, pos) 212 if th.status != "p": 213 unpacked = True 214 215 tpos = pos 216 end = pos + th.tlen 217 pos += th.headerlen() 218 219 while pos < end: 220 dh = self._read_data_header(pos) 221 self.checkData(th, tpos, dh, pos) 222 if dh.plen or dh.back: 223 self.oid2curpos[dh.oid] = pos 224 else: 225 if dh.oid in self.oid2curpos: 226 del self.oid2curpos[dh.oid] 227 pos += dh.recordlen() 228 229 tlen = self._read_num(pos) 230 if tlen != th.tlen: 231 self.fail(pos, "redundant transaction length does not " 232 "match initial transaction length: %d != %d", 233 tlen, th.tlen) 234 pos += 8 235 236 self.packpos = pos 237 238 if unpacked: 239 return 240 # check for a redundant pack. If the first record following 241 # the newly computed packpos has status 'p', then it was 242 # packed earlier and the current pack is redudant. 243 try: 244 th = self._read_txn_header(pos) 245 except CorruptedDataError as err: 246 if err.buf != b"": 247 raise 248 if th.status == 'p': 249 # Delayed import to cope with circular imports. 250 # TODO: put exceptions in a separate module. 251 from ZODB.FileStorage.FileStorage import RedundantPackWarning 252 raise RedundantPackWarning( 253 "The database has already been packed to a later time" 254 " or no changes have been made since the last pack") 255 256 def findReachableAtPacktime(self, roots): 257 """Mark all objects reachable from the oids in roots as reachable.""" 258 reachable = self.reachable 259 oid2curpos = self.oid2curpos 260 261 todo = list(roots) 262 while todo: 263 oid = todo.pop() 264 if oid in reachable: 265 continue 266 267 try: 268 pos = oid2curpos[oid] 269 except KeyError: 270 if oid == z64 and len(oid2curpos) == 0: 271 # special case, pack to before creation time 272 continue 273 raise KeyError(oid) 274 275 reachable[oid] = pos 276 for oid in self.findrefs(pos): 277 if oid not in reachable: 278 todo.append(oid) 279 280 def findReachableFromFuture(self): 281 # In this pass, the roots are positions of object revisions. 282 # We add a pos to extra_roots when there is a backpointer to a 283 # revision that was not current at the packtime. The 284 # non-current revision could refer to objects that were 285 # otherwise unreachable at the packtime. 286 extra_roots = [] 287 288 pos = self.packpos 289 while pos < self.eof: 290 th = self._read_txn_header(pos) 291 self.checkTxn(th, pos) 292 tpos = pos 293 end = pos + th.tlen 294 pos += th.headerlen() 295 296 while pos < end: 297 dh = self._read_data_header(pos) 298 self.checkData(th, tpos, dh, pos) 299 300 if dh.back and dh.back < self.packpos: 301 if dh.oid in self.reachable: 302 L = self.reach_ex.setdefault(dh.oid, []) 303 if dh.back not in L: 304 L.append(dh.back) 305 extra_roots.append(dh.back) 306 else: 307 self.reachable[dh.oid] = dh.back 308 309 pos += dh.recordlen() 310 311 tlen = self._read_num(pos) 312 if tlen != th.tlen: 313 self.fail(pos, "redundant transaction length does not " 314 "match initial transaction length: %d != %d", 315 tlen, th.tlen) 316 pos += 8 317 318 for pos in extra_roots: 319 refs = self.findrefs(pos) 320 self.findReachableAtPacktime(refs) 321 322 def findrefs(self, pos): 323 """Return a list of oids referenced as of packtime.""" 324 dh = self._read_data_header(pos) 325 # Chase backpointers until we get to the record with the refs 326 while dh.back: 327 dh = self._read_data_header(dh.back) 328 if dh.plen: 329 return self.referencesf(self._file.read(dh.plen)) 330 else: 331 return [] 332 333class FileStoragePacker(FileStorageFormatter): 334 335 # path is the storage file path. 336 # stop is the pack time, as a TimeStamp. 337 # current_size is the storage's _pos. All valid data at the start 338 # lives before that offset (there may be a checkpoint transaction in 339 # progress after it). 340 341 def __init__(self, storage, referencesf, stop, gc=True): 342 self._storage = storage 343 if storage.blob_dir: 344 self.pack_blobs = True 345 self.blob_removed = open( 346 os.path.join(storage.blob_dir, '.removed'), 'wb') 347 else: 348 self.pack_blobs = False 349 self.blob_removed = None 350 351 path = storage._file.name 352 self._name = path 353 # We open our own handle on the storage so that much of pack can 354 # proceed in parallel. It's important to close this file at every 355 # return point, else on Windows the caller won't be able to rename 356 # or remove the storage file. 357 self._file = open(path, "rb") 358 self._path = path 359 self._stop = stop 360 self.locked = False 361 self.file_end = storage.getSize() 362 363 self.gc = GC(self._file, self.file_end, self._stop, gc, referencesf) 364 365 # The packer needs to acquire the parent's commit lock 366 # during the copying stage, so the two sets of lock acquire 367 # and release methods are passed to the constructor. 368 self._lock = storage._lock 369 self._commit_lock = storage._commit_lock 370 371 # The packer will use several indexes. 372 # index: oid -> pos 373 # tindex: oid -> pos, for current txn 374 # oid2tid: not used by the packer 375 376 self.index = ZODB.fsIndex.fsIndex() 377 self.tindex = {} 378 self.oid2tid = {} 379 self.toid2tid = {} 380 self.toid2tid_delete = {} 381 382 self._tfile = None 383 384 def close(self): 385 self._file.close() 386 if self._tfile is not None: 387 self._tfile.close() 388 if self.blob_removed is not None: 389 self.blob_removed.close() 390 391 def pack(self): 392 # Pack copies all data reachable at the pack time or later. 393 # 394 # Copying occurs in two phases. In the first phase, txns 395 # before the pack time are copied if the contain any reachable 396 # data. In the second phase, all txns after the pack time 397 # are copied. 398 # 399 # Txn and data records contain pointers to previous records. 400 # Because these pointers are stored as file offsets, they 401 # must be updated when we copy data. 402 403 # TODO: Should add sanity checking to pack. 404 405 self.gc.findReachable() 406 407 def close_files_remove(): 408 # blank except: we might be in an IOError situation/handler 409 # try our best, but don't fail 410 try: 411 self._tfile.close() 412 except: 413 pass 414 try: 415 self._file.close() 416 except: 417 pass 418 try: 419 os.remove(self._name + ".pack") 420 except: 421 pass 422 if self.blob_removed is not None: 423 self.blob_removed.close() 424 425 # Setup the destination file and copy the metadata. 426 # TODO: rename from _tfile to something clearer. 427 self._tfile = open(self._name + ".pack", "w+b") 428 try: 429 self._file.seek(0) 430 self._tfile.write(self._file.read(self._metadata_size)) 431 432 self._copier = PackCopier(self._tfile, self.index, self.tindex) 433 434 ipos, opos = self.copyToPacktime() 435 except (OSError, IOError): 436 # most probably ran out of disk space or some other IO error 437 close_files_remove() 438 raise # don't succeed silently 439 440 assert ipos == self.gc.packpos 441 if ipos == opos: 442 # pack didn't free any data. there's no point in continuing. 443 close_files_remove() 444 return None 445 self._commit_lock.acquire() 446 self.locked = True 447 try: 448 with self._lock: 449 # Re-open the file in unbuffered mode. 450 451 # The main thread may write new transactions to the 452 # file, which creates the possibility that we will 453 # read a status 'c' transaction into the pack thread's 454 # stdio buffer even though we're acquiring the commit 455 # lock. Transactions can still be in progress 456 # throughout much of packing, and are written to the 457 # same physical file but via a distinct Python file 458 # object. The code used to leave off the trailing 0 459 # argument, and then on every platform except native 460 # Windows it was observed that we could read stale 461 # data from the tail end of the file. 462 self._file.close() # else self.gc keeps the original 463 # alive & open 464 self._file = open(self._path, "rb", 0) 465 self._file.seek(0, 2) 466 self.file_end = self._file.tell() 467 468 if ipos < self.file_end: 469 self.copyRest(ipos) 470 471 # OK, we've copied everything. Now we need to wrap things up. 472 pos = self._tfile.tell() 473 self._tfile.flush() 474 self._tfile.close() 475 self._file.close() 476 if self.blob_removed is not None: 477 self.blob_removed.close() 478 479 return pos 480 except (OSError, IOError): 481 # most probably ran out of disk space or some other IO error 482 close_files_remove() 483 if self.locked: 484 self._commit_lock.release() 485 raise # don't succeed silently 486 except: 487 if self.locked: 488 self._commit_lock.release() 489 raise 490 491 def copyToPacktime(self): 492 offset = 0 # the amount of space freed by packing 493 pos = self._metadata_size 494 new_pos = pos 495 496 while pos < self.gc.packpos: 497 th = self._read_txn_header(pos) 498 new_tpos, pos = self.copyDataRecords(pos, th) 499 500 if new_tpos: 501 new_pos = self._tfile.tell() + 8 502 tlen = new_pos - new_tpos - 8 503 # Update the transaction length 504 self._tfile.seek(new_tpos + 8) 505 self._tfile.write(p64(tlen)) 506 self._tfile.seek(new_pos - 8) 507 self._tfile.write(p64(tlen)) 508 509 510 tlen = self._read_num(pos) 511 if tlen != th.tlen: 512 self.fail(pos, "redundant transaction length does not " 513 "match initial transaction length: %d != %d", 514 tlen, th.tlen) 515 pos += 8 516 517 return pos, new_pos 518 519 def copyDataRecords(self, pos, th): 520 """Copy any current data records between pos and tend. 521 522 Returns position of txn header in output file and position 523 of next record in the input file. 524 525 If any data records are copied, also write txn header (th). 526 """ 527 copy = 0 528 new_tpos = 0 529 tend = pos + th.tlen 530 pos += th.headerlen() 531 while pos < tend: 532 h = self._read_data_header(pos) 533 if not self.gc.isReachable(h.oid, pos): 534 if self.pack_blobs: 535 # We need to find out if this is a blob, so get the data: 536 if h.plen: 537 data = self._file.read(h.plen) 538 else: 539 data = self.fetchDataViaBackpointer(h.oid, h.back) 540 if data and self._storage.is_blob_record(data): 541 # We need to remove the blob record. Maybe we 542 # need to remove oid: 543 544 # But first, we need to make sure the record 545 # we're looking at isn't a dup of the current 546 # record. There's a bug in ZEO blob support that causes 547 # duplicate data records. 548 rpos = self.gc.reachable.get(h.oid) 549 is_dup = (rpos 550 and self._read_data_header(rpos).tid == h.tid) 551 if not is_dup: 552 if h.oid not in self.gc.reachable: 553 self.blob_removed.write( 554 binascii.hexlify(h.oid)+b'\n') 555 else: 556 self.blob_removed.write( 557 binascii.hexlify(h.oid+h.tid)+b'\n') 558 559 pos += h.recordlen() 560 continue 561 562 pos += h.recordlen() 563 564 # If we are going to copy any data, we need to copy 565 # the transaction header. Note that we will need to 566 # patch up the transaction length when we are done. 567 if not copy: 568 th.status = "p" 569 s = th.asString() 570 new_tpos = self._tfile.tell() 571 self._tfile.write(s) 572 new_pos = new_tpos + len(s) 573 copy = 1 574 575 if h.plen: 576 data = self._file.read(h.plen) 577 else: 578 data = self.fetchDataViaBackpointer(h.oid, h.back) 579 580 self.writePackedDataRecord(h, data, new_tpos) 581 new_pos = self._tfile.tell() 582 583 return new_tpos, pos 584 585 def fetchDataViaBackpointer(self, oid, back): 586 """Return the data for oid via backpointer back 587 588 If `back` is 0 or ultimately resolves to 0, return None. 589 In this case, the transaction undoes the object 590 creation. 591 """ 592 if back == 0: 593 return None 594 data, tid = self._loadBackTxn(oid, back, 0) 595 return data 596 597 def writePackedDataRecord(self, h, data, new_tpos): 598 # Update the header to reflect current information, then write 599 # it to the output file. 600 if data is None: 601 data = b'' 602 h.prev = 0 603 h.back = 0 604 h.plen = len(data) 605 h.tloc = new_tpos 606 pos = self._tfile.tell() 607 self.index[h.oid] = pos 608 self._tfile.write(h.asString()) 609 self._tfile.write(data) 610 if not data: 611 # Packed records never have backpointers (?). 612 # If there is no data, write a z64 backpointer. 613 # This is a George Bailey event. 614 self._tfile.write(z64) 615 616 def copyRest(self, ipos): 617 # After the pack time, all data records are copied. 618 # Copy one txn at a time, using copy() for data. 619 620 try: 621 while 1: 622 ipos = self.copyOne(ipos) 623 except CorruptedDataError as err: 624 # The last call to copyOne() will raise 625 # CorruptedDataError, because it will attempt to read past 626 # the end of the file. Double-check that the exception 627 # occurred for this reason. 628 self._file.seek(0, 2) 629 endpos = self._file.tell() 630 if endpos != err.pos: 631 raise 632 633 def copyOne(self, ipos): 634 # The call below will raise CorruptedDataError at EOF. 635 th = self._read_txn_header(ipos) 636 # Release commit lock while writing to pack file 637 self._commit_lock.release() 638 self.locked = False 639 pos = self._tfile.tell() 640 self._copier.setTxnPos(pos) 641 self._tfile.write(th.asString()) 642 tend = ipos + th.tlen 643 ipos += th.headerlen() 644 645 while ipos < tend: 646 h = self._read_data_header(ipos) 647 ipos += h.recordlen() 648 prev_txn = None 649 if h.plen: 650 data = self._file.read(h.plen) 651 else: 652 data = self.fetchDataViaBackpointer(h.oid, h.back) 653 if h.back: 654 prev_txn = self.getTxnFromData(h.oid, h.back) 655 656 self._copier.copy(h.oid, h.tid, data, prev_txn, 657 pos, self._tfile.tell()) 658 659 tlen = self._tfile.tell() - pos 660 assert tlen == th.tlen 661 self._tfile.write(p64(tlen)) 662 ipos += 8 663 664 self.index.update(self.tindex) 665 self.tindex.clear() 666 self._commit_lock.acquire() 667 self.locked = True 668 return ipos 669