1"""Generic persistent, concurrent dictionary-like facility.""" 2 3from __future__ import division, with_statement, absolute_import 4 5__copyright__ = """ 6Copyright (C) 2011,2014 Andreas Kloeckner 7Copyright (C) 2017 Matt Wala 8""" 9 10__license__ = """ 11Permission is hereby granted, free of charge, to any person obtaining a copy 12of this software and associated documentation files (the "Software"), to deal 13in the Software without restriction, including without limitation the rights 14to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 15copies of the Software, and to permit persons to whom the Software is 16furnished to do so, subject to the following conditions: 17 18The above copyright notice and this permission notice shall be included in 19all copies or substantial portions of the Software. 20 21THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 22IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 23FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 24AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 25LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 26OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 27THE SOFTWARE. 28""" 29 30import logging 31logger = logging.getLogger(__name__) 32 33 34import collections 35import functools 36import six 37import sys 38import os 39import shutil 40import errno 41 42__doc__ = """ 43Persistent Hashing and Persistent Dictionaries 44============================================== 45 46This module contains functionality that allows hashing with keys that remain 47valid across interpreter invocations, unlike Python's built-in hashes. 48 49This module also provides a disk-backed dictionary that uses persistent hashing. 50 51.. autoexception:: NoSuchEntryError 52.. autoexception:: ReadOnlyEntryError 53 54.. autoexception:: CollisionWarning 55 56.. autoclass:: KeyBuilder 57.. autoclass:: PersistentDict 58.. autoclass:: WriteOncePersistentDict 59""" 60 61try: 62 import hashlib 63 new_hash = hashlib.sha256 64except ImportError: 65 # for Python << 2.5 66 import sha 67 new_hash = sha.new 68 69 70def _make_dir_recursively(dir): 71 try: 72 os.makedirs(dir) 73 except OSError as e: 74 from errno import EEXIST 75 if e.errno != EEXIST: 76 raise 77 78 79def update_checksum(checksum, obj): 80 if isinstance(obj, six.text_type): 81 checksum.update(obj.encode("utf8")) 82 else: 83 checksum.update(obj) 84 85 86def _tracks_stacklevel(cls, exclude=frozenset(["__init__"])): 87 """Changes all the methods of `cls` to track the call stack level in a member 88 called `_stacklevel`. 89 """ 90 def make_wrapper(f): 91 @functools.wraps(f) 92 def wrapper(obj, *args, **kwargs): 93 assert obj._stacklevel >= 0, obj._stacklevel 94 # Increment by 2 because the method is wrapped. 95 obj._stacklevel += 2 96 try: 97 return f(obj, *args, **kwargs) 98 finally: 99 obj._stacklevel -= 2 100 101 return wrapper 102 103 for member in cls.__dict__: 104 f = getattr(cls, member) 105 106 if member in exclude: 107 continue 108 109 if not six.callable(f): 110 continue 111 112 setattr(cls, member, make_wrapper(f)) 113 114 return cls 115 116 117# {{{ cleanup managers 118 119class CleanupBase(object): 120 pass 121 122 123class CleanupManager(CleanupBase): 124 def __init__(self): 125 self.cleanups = [] 126 127 def register(self, c): 128 self.cleanups.insert(0, c) 129 130 def clean_up(self): 131 for c in self.cleanups: 132 c.clean_up() 133 134 def error_clean_up(self): 135 for c in self.cleanups: 136 c.error_clean_up() 137 138 139class LockManager(CleanupBase): 140 def __init__(self, cleanup_m, lock_file, _stacklevel=1): 141 self.lock_file = lock_file 142 143 attempts = 0 144 while True: 145 try: 146 self.fd = os.open(self.lock_file, 147 os.O_CREAT | os.O_WRONLY | os.O_EXCL) 148 break 149 except OSError: 150 pass 151 152 from time import sleep 153 sleep(1) 154 155 attempts += 1 156 157 if attempts > 10: 158 from warnings import warn 159 warn("could not obtain lock--delete '%s' if necessary" 160 % self.lock_file, 161 stacklevel=1 + _stacklevel) 162 if attempts > 3 * 60: 163 raise RuntimeError("waited more than three minutes " 164 "on the lock file '%s'" 165 "--something is wrong" % self.lock_file) 166 167 cleanup_m.register(self) 168 169 def clean_up(self): 170 import os 171 os.close(self.fd) 172 os.unlink(self.lock_file) 173 174 def error_clean_up(self): 175 pass 176 177 178class ItemDirManager(CleanupBase): 179 def __init__(self, cleanup_m, path, delete_on_error): 180 from os.path import isdir 181 182 self.existed = isdir(path) 183 self.path = path 184 self.delete_on_error = delete_on_error 185 186 cleanup_m.register(self) 187 188 def reset(self): 189 try: 190 shutil.rmtree(self.path) 191 except OSError as e: 192 if e.errno != errno.ENOENT: 193 raise 194 195 def mkdir(self): 196 from os import mkdir 197 try: 198 mkdir(self.path) 199 except OSError as e: 200 if e.errno != errno.EEXIST: 201 raise 202 203 def clean_up(self): 204 pass 205 206 def error_clean_up(self): 207 if self.delete_on_error: 208 self.reset() 209 210# }}} 211 212 213# {{{ key generation 214 215class KeyBuilder(object): 216 def rec(self, key_hash, key): 217 digest = None 218 219 try: 220 digest = key._pytools_persistent_hash_digest 221 except AttributeError: 222 pass 223 224 if digest is None: 225 try: 226 method = key.update_persistent_hash 227 except AttributeError: 228 pass 229 else: 230 inner_key_hash = new_hash() 231 method(inner_key_hash, self) 232 digest = inner_key_hash.digest() 233 234 if digest is None: 235 try: 236 method = getattr(self, "update_for_"+type(key).__name__) 237 except AttributeError: 238 pass 239 else: 240 inner_key_hash = new_hash() 241 method(inner_key_hash, key) 242 digest = inner_key_hash.digest() 243 244 if digest is None: 245 raise TypeError("unsupported type for persistent hash keying: %s" 246 % type(key)) 247 248 if not isinstance(key, type): 249 try: 250 key._pytools_persistent_hash_digest = digest 251 except Exception: 252 pass 253 254 key_hash.update(digest) 255 256 def __call__(self, key): 257 key_hash = new_hash() 258 self.rec(key_hash, key) 259 return key_hash.hexdigest() 260 261 # {{{ updaters 262 263 def update_for_int(self, key_hash, key): 264 key_hash.update(str(key).encode("utf8")) 265 266 update_for_long = update_for_int 267 update_for_bool = update_for_int 268 269 def update_for_float(self, key_hash, key): 270 key_hash.update(repr(key).encode("utf8")) 271 272 if sys.version_info >= (3,): 273 def update_for_str(self, key_hash, key): 274 key_hash.update(key.encode('utf8')) 275 276 def update_for_bytes(self, key_hash, key): 277 key_hash.update(key) 278 else: 279 def update_for_str(self, key_hash, key): 280 key_hash.update(key) 281 282 def update_for_unicode(self, key_hash, key): 283 key_hash.update(key.encode('utf8')) 284 285 def update_for_tuple(self, key_hash, key): 286 for obj_i in key: 287 self.rec(key_hash, obj_i) 288 289 def update_for_frozenset(self, key_hash, key): 290 for set_key in sorted(key): 291 self.rec(key_hash, set_key) 292 293 def update_for_NoneType(self, key_hash, key): # noqa 294 key_hash.update("<None>".encode('utf8')) 295 296 def update_for_dtype(self, key_hash, key): 297 key_hash.update(key.str.encode('utf8')) 298 299 # }}} 300 301# }}} 302 303 304# {{{ lru cache 305 306class _LinkedList(object): 307 """The list operates on nodes of the form [value, leftptr, rightpr]. To create a 308 node of this form you can use `LinkedList.new_node().` 309 310 Supports inserting at the left and deleting from an arbitrary location. 311 """ 312 def __init__(self): 313 self.count = 0 314 self.head = None 315 self.end = None 316 317 @staticmethod 318 def new_node(element): 319 return [element, None, None] 320 321 def __len__(self): 322 return self.count 323 324 def appendleft_node(self, node): 325 self.count += 1 326 327 if self.head is None: 328 self.head = self.end = node 329 return 330 331 self.head[1] = node 332 node[2] = self.head 333 334 self.head = node 335 336 def pop_node(self): 337 end = self.end 338 self.remove_node(end) 339 return end 340 341 def remove_node(self, node): 342 self.count -= 1 343 344 if self.head is self.end: 345 assert node is self.head 346 self.head = self.end = None 347 return 348 349 left = node[1] 350 right = node[2] 351 352 if left is None: 353 self.head = right 354 else: 355 left[2] = right 356 357 if right is None: 358 self.end = left 359 else: 360 right[1] = left 361 362 node[1] = node[2] = None 363 364 365class _LRUCache(collections.MutableMapping): 366 """A mapping that keeps at most *maxsize* items with an LRU replacement policy. 367 """ 368 def __init__(self, maxsize): 369 self.lru_order = _LinkedList() 370 self.maxsize = maxsize 371 self.cache = {} 372 373 def __delitem__(self, item): 374 node = self.cache[item] 375 self.lru_order.remove_node(node) 376 del self.cache[item] 377 378 def __getitem__(self, item): 379 node = self.cache[item] 380 self.lru_order.remove_node(node) 381 self.lru_order.appendleft_node(node) 382 # A linked list node contains a tuple of the form (item, value). 383 return node[0][1] 384 385 def __contains__(self, item): 386 return item in self.cache 387 388 def __iter__(self): 389 return iter(self.cache) 390 391 def __len__(self): 392 return len(self.cache) 393 394 def clear(self): 395 self.cache.clear() 396 self.lru_order = _LinkedList() 397 398 def __setitem__(self, item, value): 399 if self.maxsize < 1: 400 return 401 402 try: 403 node = self.cache[item] 404 self.lru_order.remove_node(node) 405 except KeyError: 406 if len(self.lru_order) >= self.maxsize: 407 # Make room for new elements. 408 end_node = self.lru_order.pop_node() 409 del self.cache[end_node[0][0]] 410 411 node = self.lru_order.new_node((item, value)) 412 self.cache[item] = node 413 414 self.lru_order.appendleft_node(node) 415 416 assert len(self.cache) == len(self.lru_order), \ 417 (len(self.cache), len(self.lru_order)) 418 assert len(self.lru_order) <= self.maxsize 419 420 return node[0] 421 422# }}} 423 424 425# {{{ top-level 426 427class NoSuchEntryError(KeyError): 428 pass 429 430 431class ReadOnlyEntryError(KeyError): 432 pass 433 434 435class CollisionWarning(UserWarning): 436 pass 437 438 439@_tracks_stacklevel 440class _PersistentDictBase(object): 441 def __init__(self, identifier, key_builder=None, container_dir=None): 442 # for issuing warnings 443 self._stacklevel = 0 444 445 self.identifier = identifier 446 447 if key_builder is None: 448 key_builder = KeyBuilder() 449 450 self.key_builder = key_builder 451 452 from os.path import join 453 if container_dir is None: 454 import appdirs 455 container_dir = join( 456 appdirs.user_cache_dir("pytools", "pytools"), 457 "pdict-v2-%s-py%s" % ( 458 identifier, 459 ".".join(str(i) for i in sys.version_info),)) 460 461 self.container_dir = container_dir 462 463 self._make_container_dir() 464 465 def _warn(self, msg, category=UserWarning): 466 from warnings import warn 467 warn(msg, category, stacklevel=1 + self._stacklevel) 468 469 def store_if_not_present(self, key, value): 470 self.store(key, value, _skip_if_present=True) 471 472 def store(self, key, value, _skip_if_present=False): 473 raise NotImplementedError() 474 475 def fetch(self, key): 476 raise NotImplementedError() 477 478 def _read(self, path): 479 from six.moves.cPickle import load 480 with open(path, "rb") as inf: 481 return load(inf) 482 483 def _write(self, path, value): 484 from six.moves.cPickle import dump, HIGHEST_PROTOCOL 485 with open(path, "wb") as outf: 486 dump(value, outf, protocol=HIGHEST_PROTOCOL) 487 488 def _item_dir(self, hexdigest_key): 489 from os.path import join 490 return join(self.container_dir, hexdigest_key) 491 492 def _key_file(self, hexdigest_key): 493 from os.path import join 494 return join(self._item_dir(hexdigest_key), "key") 495 496 def _contents_file(self, hexdigest_key): 497 from os.path import join 498 return join(self._item_dir(hexdigest_key), "contents") 499 500 def _lock_file(self, hexdigest_key): 501 from os.path import join 502 return join(self.container_dir, str(hexdigest_key) + ".lock") 503 504 def _make_container_dir(self): 505 _make_dir_recursively(self.container_dir) 506 507 def _collision_check(self, key, stored_key): 508 if stored_key != key: 509 # Key collision, oh well. 510 self._warn("%s: key collision in cache at '%s' -- these are " 511 "sufficiently unlikely that they're often " 512 "indicative of a broken hash key implementation " 513 "(that is not considering some elements relevant " 514 "for equality comparison)" 515 % (self.identifier, self.container_dir), 516 CollisionWarning) 517 518 # This is here so we can step through equality comparison to 519 # see what is actually non-equal. 520 import pudb;pudb.set_trace() 521 stored_key == key 522 raise NoSuchEntryError(key) 523 524 def __getitem__(self, key): 525 return self.fetch(key) 526 527 def __setitem__(self, key, value): 528 self.store(key, value) 529 530 def __delitem__(self, key): 531 raise NotImplementedError() 532 533 def clear(self): 534 try: 535 shutil.rmtree(self.container_dir) 536 except OSError as e: 537 if e.errno != errno.ENOENT: 538 raise 539 540 self._make_container_dir() 541 542 543@_tracks_stacklevel 544class WriteOncePersistentDict(_PersistentDictBase): 545 """A concurrent disk-backed dictionary that disallows overwriting/deletion. 546 547 Compared with :class:`PersistentDict`, this class has faster 548 retrieval times. 549 550 .. automethod:: __init__ 551 .. automethod:: __getitem__ 552 .. automethod:: __setitem__ 553 .. automethod:: clear 554 .. automethod:: store 555 .. automethod:: store_if_not_present 556 .. automethod:: fetch 557 """ 558 def __init__(self, identifier, key_builder=None, container_dir=None, 559 in_mem_cache_size=256): 560 """ 561 :arg identifier: a file-name-compatible string identifying this 562 dictionary 563 :arg key_builder: a subclass of :class:`KeyBuilder` 564 :arg in_mem_cache_size: retain an in-memory cache of up to 565 *in_mem_cache_size* items 566 """ 567 _PersistentDictBase.__init__(self, identifier, key_builder, container_dir) 568 self._cache = _LRUCache(in_mem_cache_size) 569 570 def _spin_until_removed(self, lock_file): 571 from os.path import exists 572 573 attempts = 0 574 while exists(lock_file): 575 from time import sleep 576 sleep(1) 577 578 attempts += 1 579 580 if attempts > 10: 581 self._warn("waiting until unlocked--delete '%s' if necessary" 582 % lock_file) 583 584 if attempts > 3 * 60: 585 raise RuntimeError("waited more than three minutes " 586 "on the lock file '%s'" 587 "--something is wrong" % lock_file) 588 589 def store(self, key, value, _skip_if_present=False): 590 hexdigest_key = self.key_builder(key) 591 592 cleanup_m = CleanupManager() 593 try: 594 try: 595 LockManager(cleanup_m, self._lock_file(hexdigest_key)) 596 item_dir_m = ItemDirManager( 597 cleanup_m, self._item_dir(hexdigest_key), 598 delete_on_error=False) 599 600 if item_dir_m.existed: 601 if _skip_if_present: 602 return 603 raise ReadOnlyEntryError(key) 604 605 item_dir_m.mkdir() 606 607 key_path = self._key_file(hexdigest_key) 608 value_path = self._contents_file(hexdigest_key) 609 610 self._write(value_path, value) 611 self._write(key_path, key) 612 613 logger.debug("%s: disk cache store [key=%s]" % ( 614 self.identifier, hexdigest_key)) 615 except Exception: 616 cleanup_m.error_clean_up() 617 raise 618 finally: 619 cleanup_m.clean_up() 620 621 def fetch(self, key): 622 hexdigest_key = self.key_builder(key) 623 624 # {{{ in memory cache 625 626 try: 627 stored_key, stored_value = self._cache[hexdigest_key] 628 except KeyError: 629 pass 630 else: 631 logger.debug("%s: in mem cache hit [key=%s]" % ( 632 self.identifier, hexdigest_key)) 633 self._collision_check(key, stored_key) 634 return stored_value 635 636 # }}} 637 638 # {{{ check path exists and is unlocked 639 640 item_dir = self._item_dir(hexdigest_key) 641 642 from os.path import isdir 643 if not isdir(item_dir): 644 logger.debug("%s: disk cache miss [key=%s]" % ( 645 self.identifier, hexdigest_key)) 646 raise NoSuchEntryError(key) 647 648 lock_file = self._lock_file(hexdigest_key) 649 self._spin_until_removed(lock_file) 650 651 # }}} 652 653 key_file = self._key_file(hexdigest_key) 654 contents_file = self._contents_file(hexdigest_key) 655 656 # Note: Unlike PersistentDict, this doesn't autodelete invalid entires, 657 # because that would lead to a race condition. 658 659 # {{{ load key file and do equality check 660 661 try: 662 read_key = self._read(key_file) 663 except Exception as e: 664 self._warn("pytools.persistent_dict.WriteOncePersistentDict(%s) " 665 "encountered an invalid " 666 "key file for key %s. Remove the directory " 667 "'%s' if necessary. (caught: %s)" 668 % (self.identifier, hexdigest_key, item_dir, str(e))) 669 raise NoSuchEntryError(key) 670 671 self._collision_check(key, read_key) 672 673 # }}} 674 675 logger.debug("%s: disk cache hit [key=%s]" % ( 676 self.identifier, hexdigest_key)) 677 678 # {{{ load contents 679 680 try: 681 read_contents = self._read(contents_file) 682 except Exception: 683 self._warn("pytools.persistent_dict.WriteOncePersistentDict(%s) " 684 "encountered an invalid " 685 "key file for key %s. Remove the directory " 686 "'%s' if necessary." 687 % (self.identifier, hexdigest_key, item_dir)) 688 raise NoSuchEntryError(key) 689 690 # }}} 691 692 self._cache[hexdigest_key] = (key, read_contents) 693 return read_contents 694 695 def clear(self): 696 _PersistentDictBase.clear(self) 697 self._cache.clear() 698 699 700@_tracks_stacklevel 701class PersistentDict(_PersistentDictBase): 702 """A concurrent disk-backed dictionary. 703 704 .. automethod:: __init__ 705 .. automethod:: __getitem__ 706 .. automethod:: __setitem__ 707 .. automethod:: clear 708 .. automethod:: store 709 .. automethod:: store_if_not_present 710 .. automethod:: fetch 711 """ 712 def __init__(self, identifier, key_builder=None, container_dir=None): 713 """ 714 :arg identifier: a file-name-compatible string identifying this 715 dictionary 716 :arg key_builder: a subclass of :class:`KeyBuilder` 717 """ 718 _PersistentDictBase.__init__(self, identifier, key_builder, container_dir) 719 720 def store(self, key, value, _skip_if_present=False): 721 hexdigest_key = self.key_builder(key) 722 723 cleanup_m = CleanupManager() 724 try: 725 try: 726 LockManager(cleanup_m, self._lock_file(hexdigest_key), 727 1 + self._stacklevel) 728 item_dir_m = ItemDirManager( 729 cleanup_m, self._item_dir(hexdigest_key), 730 delete_on_error=True) 731 732 if item_dir_m.existed: 733 if _skip_if_present: 734 return 735 item_dir_m.reset() 736 737 item_dir_m.mkdir() 738 739 key_path = self._key_file(hexdigest_key) 740 value_path = self._contents_file(hexdigest_key) 741 742 self._write(value_path, value) 743 self._write(key_path, key) 744 745 logger.debug("%s: cache store [key=%s]" % ( 746 self.identifier, hexdigest_key)) 747 except Exception: 748 cleanup_m.error_clean_up() 749 raise 750 finally: 751 cleanup_m.clean_up() 752 753 def fetch(self, key): 754 hexdigest_key = self.key_builder(key) 755 item_dir = self._item_dir(hexdigest_key) 756 757 from os.path import isdir 758 if not isdir(item_dir): 759 logger.debug("%s: cache miss [key=%s]" % ( 760 self.identifier, hexdigest_key)) 761 raise NoSuchEntryError(key) 762 763 cleanup_m = CleanupManager() 764 try: 765 try: 766 LockManager(cleanup_m, self._lock_file(hexdigest_key), 767 1 + self._stacklevel) 768 item_dir_m = ItemDirManager( 769 cleanup_m, item_dir, delete_on_error=False) 770 771 key_path = self._key_file(hexdigest_key) 772 value_path = self._contents_file(hexdigest_key) 773 774 # {{{ load key 775 776 try: 777 read_key = self._read(key_path) 778 except Exception: 779 item_dir_m.reset() 780 self._warn("pytools.persistent_dict.PersistentDict(%s) " 781 "encountered an invalid " 782 "key file for key %s. Entry deleted." 783 % (self.identifier, hexdigest_key)) 784 raise NoSuchEntryError(key) 785 786 self._collision_check(key, read_key) 787 788 # }}} 789 790 logger.debug("%s: cache hit [key=%s]" % ( 791 self.identifier, hexdigest_key)) 792 793 # {{{ load value 794 795 try: 796 read_contents = self._read(value_path) 797 except Exception: 798 item_dir_m.reset() 799 self._warn("pytools.persistent_dict.PersistentDict(%s) " 800 "encountered an invalid " 801 "key file for key %s. Entry deleted." 802 % (self.identifier, hexdigest_key)) 803 raise NoSuchEntryError(key) 804 805 return read_contents 806 807 # }}} 808 809 except Exception: 810 cleanup_m.error_clean_up() 811 raise 812 finally: 813 cleanup_m.clean_up() 814 815 def remove(self, key): 816 hexdigest_key = self.key_builder(key) 817 818 item_dir = self._item_dir(hexdigest_key) 819 from os.path import isdir 820 if not isdir(item_dir): 821 raise NoSuchEntryError(key) 822 823 cleanup_m = CleanupManager() 824 try: 825 try: 826 LockManager(cleanup_m, self._lock_file(hexdigest_key), 827 1 + self._stacklevel) 828 item_dir_m = ItemDirManager( 829 cleanup_m, item_dir, delete_on_error=False) 830 key_file = self._key_file(hexdigest_key) 831 832 # {{{ load key 833 834 try: 835 read_key = self._read(key_file) 836 except Exception: 837 item_dir_m.reset() 838 self._warn("pytools.persistent_dict.PersistentDict(%s) " 839 "encountered an invalid " 840 "key file for key %s. Entry deleted." 841 % (self.identifier, hexdigest_key)) 842 raise NoSuchEntryError(key) 843 844 self._collision_check(key, read_key) 845 846 # }}} 847 848 item_dir_m.reset() 849 850 except Exception: 851 cleanup_m.error_clean_up() 852 raise 853 finally: 854 cleanup_m.clean_up() 855 856 def __delitem__(self, key): 857 self.remove(key) 858 859# }}} 860 861# vim: foldmethod=marker 862