1"""Generic persistent, concurrent dictionary-like facility."""
2
3from __future__ import division, with_statement, absolute_import
4
5__copyright__ = """
6Copyright (C) 2011,2014 Andreas Kloeckner
7Copyright (C) 2017 Matt Wala
8"""
9
10__license__ = """
11Permission is hereby granted, free of charge, to any person obtaining a copy
12of this software and associated documentation files (the "Software"), to deal
13in the Software without restriction, including without limitation the rights
14to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15copies of the Software, and to permit persons to whom the Software is
16furnished to do so, subject to the following conditions:
17
18The above copyright notice and this permission notice shall be included in
19all copies or substantial portions of the Software.
20
21THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27THE SOFTWARE.
28"""
29
30import logging
31logger = logging.getLogger(__name__)
32
33
34import collections
35import functools
36import six
37import sys
38import os
39import shutil
40import errno
41
42__doc__ = """
43Persistent Hashing and Persistent Dictionaries
44==============================================
45
46This module contains functionality that allows hashing with keys that remain
47valid across interpreter invocations, unlike Python's built-in hashes.
48
49This module also provides a disk-backed dictionary that uses persistent hashing.
50
51.. autoexception:: NoSuchEntryError
52.. autoexception:: ReadOnlyEntryError
53
54.. autoexception:: CollisionWarning
55
56.. autoclass:: KeyBuilder
57.. autoclass:: PersistentDict
58.. autoclass:: WriteOncePersistentDict
59"""
60
61try:
62    import hashlib
63    new_hash = hashlib.sha256
64except ImportError:
65    # for Python << 2.5
66    import sha
67    new_hash = sha.new
68
69
70def _make_dir_recursively(dir):
71    try:
72        os.makedirs(dir)
73    except OSError as e:
74        from errno import EEXIST
75        if e.errno != EEXIST:
76            raise
77
78
79def update_checksum(checksum, obj):
80    if isinstance(obj, six.text_type):
81        checksum.update(obj.encode("utf8"))
82    else:
83        checksum.update(obj)
84
85
86def _tracks_stacklevel(cls, exclude=frozenset(["__init__"])):
87    """Changes all the methods of `cls` to track the call stack level in a member
88    called `_stacklevel`.
89    """
90    def make_wrapper(f):
91        @functools.wraps(f)
92        def wrapper(obj, *args, **kwargs):
93            assert obj._stacklevel >= 0, obj._stacklevel
94            # Increment by 2 because the method is wrapped.
95            obj._stacklevel += 2
96            try:
97                return f(obj, *args, **kwargs)
98            finally:
99                obj._stacklevel -= 2
100
101        return wrapper
102
103    for member in cls.__dict__:
104        f = getattr(cls, member)
105
106        if member in exclude:
107            continue
108
109        if not six.callable(f):
110            continue
111
112        setattr(cls, member, make_wrapper(f))
113
114    return cls
115
116
117# {{{ cleanup managers
118
119class CleanupBase(object):
120    pass
121
122
123class CleanupManager(CleanupBase):
124    def __init__(self):
125        self.cleanups = []
126
127    def register(self, c):
128        self.cleanups.insert(0, c)
129
130    def clean_up(self):
131        for c in self.cleanups:
132            c.clean_up()
133
134    def error_clean_up(self):
135        for c in self.cleanups:
136            c.error_clean_up()
137
138
139class LockManager(CleanupBase):
140    def __init__(self, cleanup_m, lock_file, _stacklevel=1):
141        self.lock_file = lock_file
142
143        attempts = 0
144        while True:
145            try:
146                self.fd = os.open(self.lock_file,
147                        os.O_CREAT | os.O_WRONLY | os.O_EXCL)
148                break
149            except OSError:
150                pass
151
152            from time import sleep
153            sleep(1)
154
155            attempts += 1
156
157            if attempts > 10:
158                from warnings import warn
159                warn("could not obtain lock--delete '%s' if necessary"
160                        % self.lock_file,
161                     stacklevel=1 + _stacklevel)
162            if attempts > 3 * 60:
163                raise RuntimeError("waited more than three minutes "
164                        "on the lock file '%s'"
165                        "--something is wrong" % self.lock_file)
166
167        cleanup_m.register(self)
168
169    def clean_up(self):
170        import os
171        os.close(self.fd)
172        os.unlink(self.lock_file)
173
174    def error_clean_up(self):
175        pass
176
177
178class ItemDirManager(CleanupBase):
179    def __init__(self, cleanup_m, path, delete_on_error):
180        from os.path import isdir
181
182        self.existed = isdir(path)
183        self.path = path
184        self.delete_on_error = delete_on_error
185
186        cleanup_m.register(self)
187
188    def reset(self):
189        try:
190            shutil.rmtree(self.path)
191        except OSError as e:
192            if e.errno != errno.ENOENT:
193                raise
194
195    def mkdir(self):
196        from os import mkdir
197        try:
198            mkdir(self.path)
199        except OSError as e:
200            if e.errno != errno.EEXIST:
201                raise
202
203    def clean_up(self):
204        pass
205
206    def error_clean_up(self):
207        if self.delete_on_error:
208            self.reset()
209
210# }}}
211
212
213# {{{ key generation
214
215class KeyBuilder(object):
216    def rec(self, key_hash, key):
217        digest = None
218
219        try:
220            digest = key._pytools_persistent_hash_digest
221        except AttributeError:
222            pass
223
224        if digest is None:
225            try:
226                method = key.update_persistent_hash
227            except AttributeError:
228                pass
229            else:
230                inner_key_hash = new_hash()
231                method(inner_key_hash, self)
232                digest = inner_key_hash.digest()
233
234        if digest is None:
235            try:
236                method = getattr(self, "update_for_"+type(key).__name__)
237            except AttributeError:
238                pass
239            else:
240                inner_key_hash = new_hash()
241                method(inner_key_hash, key)
242                digest = inner_key_hash.digest()
243
244        if digest is None:
245            raise TypeError("unsupported type for persistent hash keying: %s"
246                    % type(key))
247
248        if not isinstance(key, type):
249            try:
250                key._pytools_persistent_hash_digest = digest
251            except Exception:
252                pass
253
254        key_hash.update(digest)
255
256    def __call__(self, key):
257        key_hash = new_hash()
258        self.rec(key_hash, key)
259        return key_hash.hexdigest()
260
261    # {{{ updaters
262
263    def update_for_int(self, key_hash, key):
264        key_hash.update(str(key).encode("utf8"))
265
266    update_for_long = update_for_int
267    update_for_bool = update_for_int
268
269    def update_for_float(self, key_hash, key):
270        key_hash.update(repr(key).encode("utf8"))
271
272    if sys.version_info >= (3,):
273        def update_for_str(self, key_hash, key):
274            key_hash.update(key.encode('utf8'))
275
276        def update_for_bytes(self, key_hash, key):
277            key_hash.update(key)
278    else:
279        def update_for_str(self, key_hash, key):
280            key_hash.update(key)
281
282        def update_for_unicode(self, key_hash, key):
283            key_hash.update(key.encode('utf8'))
284
285    def update_for_tuple(self, key_hash, key):
286        for obj_i in key:
287            self.rec(key_hash, obj_i)
288
289    def update_for_frozenset(self, key_hash, key):
290        for set_key in sorted(key):
291            self.rec(key_hash, set_key)
292
293    def update_for_NoneType(self, key_hash, key):  # noqa
294        key_hash.update("<None>".encode('utf8'))
295
296    def update_for_dtype(self, key_hash, key):
297        key_hash.update(key.str.encode('utf8'))
298
299    # }}}
300
301# }}}
302
303
304# {{{ lru cache
305
306class _LinkedList(object):
307    """The list operates on nodes of the form [value, leftptr, rightpr]. To create a
308    node of this form you can use `LinkedList.new_node().`
309
310    Supports inserting at the left and deleting from an arbitrary location.
311    """
312    def __init__(self):
313        self.count = 0
314        self.head = None
315        self.end = None
316
317    @staticmethod
318    def new_node(element):
319        return [element, None, None]
320
321    def __len__(self):
322        return self.count
323
324    def appendleft_node(self, node):
325        self.count += 1
326
327        if self.head is None:
328            self.head = self.end = node
329            return
330
331        self.head[1] = node
332        node[2] = self.head
333
334        self.head = node
335
336    def pop_node(self):
337        end = self.end
338        self.remove_node(end)
339        return end
340
341    def remove_node(self, node):
342        self.count -= 1
343
344        if self.head is self.end:
345            assert node is self.head
346            self.head = self.end = None
347            return
348
349        left = node[1]
350        right = node[2]
351
352        if left is None:
353            self.head = right
354        else:
355            left[2] = right
356
357        if right is None:
358            self.end = left
359        else:
360            right[1] = left
361
362        node[1] = node[2] = None
363
364
365class _LRUCache(collections.MutableMapping):
366    """A mapping that keeps at most *maxsize* items with an LRU replacement policy.
367    """
368    def __init__(self, maxsize):
369        self.lru_order = _LinkedList()
370        self.maxsize = maxsize
371        self.cache = {}
372
373    def __delitem__(self, item):
374        node = self.cache[item]
375        self.lru_order.remove_node(node)
376        del self.cache[item]
377
378    def __getitem__(self, item):
379        node = self.cache[item]
380        self.lru_order.remove_node(node)
381        self.lru_order.appendleft_node(node)
382        # A linked list node contains a tuple of the form (item, value).
383        return node[0][1]
384
385    def __contains__(self, item):
386        return item in self.cache
387
388    def __iter__(self):
389        return iter(self.cache)
390
391    def __len__(self):
392        return len(self.cache)
393
394    def clear(self):
395        self.cache.clear()
396        self.lru_order = _LinkedList()
397
398    def __setitem__(self, item, value):
399        if self.maxsize < 1:
400            return
401
402        try:
403            node = self.cache[item]
404            self.lru_order.remove_node(node)
405        except KeyError:
406            if len(self.lru_order) >= self.maxsize:
407                # Make room for new elements.
408                end_node = self.lru_order.pop_node()
409                del self.cache[end_node[0][0]]
410
411            node = self.lru_order.new_node((item, value))
412            self.cache[item] = node
413
414        self.lru_order.appendleft_node(node)
415
416        assert len(self.cache) == len(self.lru_order), \
417                (len(self.cache), len(self.lru_order))
418        assert len(self.lru_order) <= self.maxsize
419
420        return node[0]
421
422# }}}
423
424
425# {{{ top-level
426
427class NoSuchEntryError(KeyError):
428    pass
429
430
431class ReadOnlyEntryError(KeyError):
432    pass
433
434
435class CollisionWarning(UserWarning):
436    pass
437
438
439@_tracks_stacklevel
440class _PersistentDictBase(object):
441    def __init__(self, identifier, key_builder=None, container_dir=None):
442        # for issuing warnings
443        self._stacklevel = 0
444
445        self.identifier = identifier
446
447        if key_builder is None:
448            key_builder = KeyBuilder()
449
450        self.key_builder = key_builder
451
452        from os.path import join
453        if container_dir is None:
454            import appdirs
455            container_dir = join(
456                    appdirs.user_cache_dir("pytools", "pytools"),
457                    "pdict-v2-%s-py%s" % (
458                        identifier,
459                        ".".join(str(i) for i in sys.version_info),))
460
461        self.container_dir = container_dir
462
463        self._make_container_dir()
464
465    def _warn(self, msg, category=UserWarning):
466        from warnings import warn
467        warn(msg, category, stacklevel=1 + self._stacklevel)
468
469    def store_if_not_present(self, key, value):
470        self.store(key, value, _skip_if_present=True)
471
472    def store(self, key, value, _skip_if_present=False):
473        raise NotImplementedError()
474
475    def fetch(self, key):
476        raise NotImplementedError()
477
478    def _read(self, path):
479        from six.moves.cPickle import load
480        with open(path, "rb") as inf:
481            return load(inf)
482
483    def _write(self, path, value):
484        from six.moves.cPickle import dump, HIGHEST_PROTOCOL
485        with open(path, "wb") as outf:
486            dump(value, outf, protocol=HIGHEST_PROTOCOL)
487
488    def _item_dir(self, hexdigest_key):
489        from os.path import join
490        return join(self.container_dir, hexdigest_key)
491
492    def _key_file(self, hexdigest_key):
493        from os.path import join
494        return join(self._item_dir(hexdigest_key), "key")
495
496    def _contents_file(self, hexdigest_key):
497        from os.path import join
498        return join(self._item_dir(hexdigest_key), "contents")
499
500    def _lock_file(self, hexdigest_key):
501        from os.path import join
502        return join(self.container_dir, str(hexdigest_key) + ".lock")
503
504    def _make_container_dir(self):
505        _make_dir_recursively(self.container_dir)
506
507    def _collision_check(self, key, stored_key):
508        if stored_key != key:
509            # Key collision, oh well.
510            self._warn("%s: key collision in cache at '%s' -- these are "
511                    "sufficiently unlikely that they're often "
512                    "indicative of a broken hash key implementation "
513                    "(that is not considering some elements relevant "
514                    "for equality comparison)"
515                    % (self.identifier, self.container_dir),
516                    CollisionWarning)
517
518            # This is here so we can step through equality comparison to
519            # see what is actually non-equal.
520            import pudb;pudb.set_trace()
521            stored_key == key
522            raise NoSuchEntryError(key)
523
524    def __getitem__(self, key):
525        return self.fetch(key)
526
527    def __setitem__(self, key, value):
528        self.store(key, value)
529
530    def __delitem__(self, key):
531        raise NotImplementedError()
532
533    def clear(self):
534        try:
535            shutil.rmtree(self.container_dir)
536        except OSError as e:
537            if e.errno != errno.ENOENT:
538                raise
539
540        self._make_container_dir()
541
542
543@_tracks_stacklevel
544class WriteOncePersistentDict(_PersistentDictBase):
545    """A concurrent disk-backed dictionary that disallows overwriting/deletion.
546
547    Compared with :class:`PersistentDict`, this class has faster
548    retrieval times.
549
550    .. automethod:: __init__
551    .. automethod:: __getitem__
552    .. automethod:: __setitem__
553    .. automethod:: clear
554    .. automethod:: store
555    .. automethod:: store_if_not_present
556    .. automethod:: fetch
557    """
558    def __init__(self, identifier, key_builder=None, container_dir=None,
559             in_mem_cache_size=256):
560        """
561        :arg identifier: a file-name-compatible string identifying this
562            dictionary
563        :arg key_builder: a subclass of :class:`KeyBuilder`
564        :arg in_mem_cache_size: retain an in-memory cache of up to
565            *in_mem_cache_size* items
566        """
567        _PersistentDictBase.__init__(self, identifier, key_builder, container_dir)
568        self._cache = _LRUCache(in_mem_cache_size)
569
570    def _spin_until_removed(self, lock_file):
571        from os.path import exists
572
573        attempts = 0
574        while exists(lock_file):
575            from time import sleep
576            sleep(1)
577
578            attempts += 1
579
580            if attempts > 10:
581                self._warn("waiting until unlocked--delete '%s' if necessary"
582                        % lock_file)
583
584            if attempts > 3 * 60:
585                raise RuntimeError("waited more than three minutes "
586                        "on the lock file '%s'"
587                        "--something is wrong" % lock_file)
588
589    def store(self, key, value, _skip_if_present=False):
590        hexdigest_key = self.key_builder(key)
591
592        cleanup_m = CleanupManager()
593        try:
594            try:
595                LockManager(cleanup_m, self._lock_file(hexdigest_key))
596                item_dir_m = ItemDirManager(
597                        cleanup_m, self._item_dir(hexdigest_key),
598                        delete_on_error=False)
599
600                if item_dir_m.existed:
601                    if _skip_if_present:
602                        return
603                    raise ReadOnlyEntryError(key)
604
605                item_dir_m.mkdir()
606
607                key_path = self._key_file(hexdigest_key)
608                value_path = self._contents_file(hexdigest_key)
609
610                self._write(value_path, value)
611                self._write(key_path, key)
612
613                logger.debug("%s: disk cache store [key=%s]" % (
614                        self.identifier, hexdigest_key))
615            except Exception:
616                cleanup_m.error_clean_up()
617                raise
618        finally:
619            cleanup_m.clean_up()
620
621    def fetch(self, key):
622        hexdigest_key = self.key_builder(key)
623
624        # {{{ in memory cache
625
626        try:
627            stored_key, stored_value = self._cache[hexdigest_key]
628        except KeyError:
629            pass
630        else:
631            logger.debug("%s: in mem cache hit [key=%s]" % (
632                    self.identifier, hexdigest_key))
633            self._collision_check(key, stored_key)
634            return stored_value
635
636        # }}}
637
638        # {{{ check path exists and is unlocked
639
640        item_dir = self._item_dir(hexdigest_key)
641
642        from os.path import isdir
643        if not isdir(item_dir):
644            logger.debug("%s: disk cache miss [key=%s]" % (
645                    self.identifier, hexdigest_key))
646            raise NoSuchEntryError(key)
647
648        lock_file = self._lock_file(hexdigest_key)
649        self._spin_until_removed(lock_file)
650
651        # }}}
652
653        key_file = self._key_file(hexdigest_key)
654        contents_file = self._contents_file(hexdigest_key)
655
656        # Note: Unlike PersistentDict, this doesn't autodelete invalid entires,
657        # because that would lead to a race condition.
658
659        # {{{ load key file and do equality check
660
661        try:
662            read_key = self._read(key_file)
663        except Exception as e:
664            self._warn("pytools.persistent_dict.WriteOncePersistentDict(%s) "
665                    "encountered an invalid "
666                    "key file for key %s. Remove the directory "
667                    "'%s' if necessary. (caught: %s)"
668                    % (self.identifier, hexdigest_key, item_dir, str(e)))
669            raise NoSuchEntryError(key)
670
671        self._collision_check(key, read_key)
672
673        # }}}
674
675        logger.debug("%s: disk cache hit [key=%s]" % (
676                self.identifier, hexdigest_key))
677
678        # {{{ load contents
679
680        try:
681            read_contents = self._read(contents_file)
682        except Exception:
683            self._warn("pytools.persistent_dict.WriteOncePersistentDict(%s) "
684                    "encountered an invalid "
685                    "key file for key %s. Remove the directory "
686                    "'%s' if necessary."
687                    % (self.identifier, hexdigest_key, item_dir))
688            raise NoSuchEntryError(key)
689
690        # }}}
691
692        self._cache[hexdigest_key] = (key, read_contents)
693        return read_contents
694
695    def clear(self):
696        _PersistentDictBase.clear(self)
697        self._cache.clear()
698
699
700@_tracks_stacklevel
701class PersistentDict(_PersistentDictBase):
702    """A concurrent disk-backed dictionary.
703
704    .. automethod:: __init__
705    .. automethod:: __getitem__
706    .. automethod:: __setitem__
707    .. automethod:: clear
708    .. automethod:: store
709    .. automethod:: store_if_not_present
710    .. automethod:: fetch
711    """
712    def __init__(self, identifier, key_builder=None, container_dir=None):
713        """
714        :arg identifier: a file-name-compatible string identifying this
715            dictionary
716        :arg key_builder: a subclass of :class:`KeyBuilder`
717        """
718        _PersistentDictBase.__init__(self, identifier, key_builder, container_dir)
719
720    def store(self, key, value, _skip_if_present=False):
721        hexdigest_key = self.key_builder(key)
722
723        cleanup_m = CleanupManager()
724        try:
725            try:
726                LockManager(cleanup_m, self._lock_file(hexdigest_key),
727                        1 + self._stacklevel)
728                item_dir_m = ItemDirManager(
729                        cleanup_m, self._item_dir(hexdigest_key),
730                        delete_on_error=True)
731
732                if item_dir_m.existed:
733                    if _skip_if_present:
734                        return
735                    item_dir_m.reset()
736
737                item_dir_m.mkdir()
738
739                key_path = self._key_file(hexdigest_key)
740                value_path = self._contents_file(hexdigest_key)
741
742                self._write(value_path, value)
743                self._write(key_path, key)
744
745                logger.debug("%s: cache store [key=%s]" % (
746                        self.identifier, hexdigest_key))
747            except Exception:
748                cleanup_m.error_clean_up()
749                raise
750        finally:
751            cleanup_m.clean_up()
752
753    def fetch(self, key):
754        hexdigest_key = self.key_builder(key)
755        item_dir = self._item_dir(hexdigest_key)
756
757        from os.path import isdir
758        if not isdir(item_dir):
759            logger.debug("%s: cache miss [key=%s]" % (
760                    self.identifier, hexdigest_key))
761            raise NoSuchEntryError(key)
762
763        cleanup_m = CleanupManager()
764        try:
765            try:
766                LockManager(cleanup_m, self._lock_file(hexdigest_key),
767                        1 + self._stacklevel)
768                item_dir_m = ItemDirManager(
769                        cleanup_m, item_dir, delete_on_error=False)
770
771                key_path = self._key_file(hexdigest_key)
772                value_path = self._contents_file(hexdigest_key)
773
774                # {{{ load key
775
776                try:
777                    read_key = self._read(key_path)
778                except Exception:
779                    item_dir_m.reset()
780                    self._warn("pytools.persistent_dict.PersistentDict(%s) "
781                            "encountered an invalid "
782                            "key file for key %s. Entry deleted."
783                            % (self.identifier, hexdigest_key))
784                    raise NoSuchEntryError(key)
785
786                self._collision_check(key, read_key)
787
788                # }}}
789
790                logger.debug("%s: cache hit [key=%s]" % (
791                        self.identifier, hexdigest_key))
792
793                # {{{ load value
794
795                try:
796                    read_contents = self._read(value_path)
797                except Exception:
798                    item_dir_m.reset()
799                    self._warn("pytools.persistent_dict.PersistentDict(%s) "
800                            "encountered an invalid "
801                            "key file for key %s. Entry deleted."
802                            % (self.identifier, hexdigest_key))
803                    raise NoSuchEntryError(key)
804
805                return read_contents
806
807                # }}}
808
809            except Exception:
810                cleanup_m.error_clean_up()
811                raise
812        finally:
813            cleanup_m.clean_up()
814
815    def remove(self, key):
816        hexdigest_key = self.key_builder(key)
817
818        item_dir = self._item_dir(hexdigest_key)
819        from os.path import isdir
820        if not isdir(item_dir):
821            raise NoSuchEntryError(key)
822
823        cleanup_m = CleanupManager()
824        try:
825            try:
826                LockManager(cleanup_m, self._lock_file(hexdigest_key),
827                        1 + self._stacklevel)
828                item_dir_m = ItemDirManager(
829                        cleanup_m, item_dir, delete_on_error=False)
830                key_file = self._key_file(hexdigest_key)
831
832                # {{{ load key
833
834                try:
835                    read_key = self._read(key_file)
836                except Exception:
837                    item_dir_m.reset()
838                    self._warn("pytools.persistent_dict.PersistentDict(%s) "
839                            "encountered an invalid "
840                            "key file for key %s. Entry deleted."
841                            % (self.identifier, hexdigest_key))
842                    raise NoSuchEntryError(key)
843
844                self._collision_check(key, read_key)
845
846                # }}}
847
848                item_dir_m.reset()
849
850            except Exception:
851                cleanup_m.error_clean_up()
852                raise
853        finally:
854            cleanup_m.clean_up()
855
856    def __delitem__(self, key):
857        self.remove(key)
858
859# }}}
860
861# vim: foldmethod=marker
862