1# Copyright (C) 2008-2011 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Persistent maps from tuple_of_strings->string using CHK stores.
18
19Overview and current status:
20
21The CHKMap class implements a dict from tuple_of_strings->string by using a trie
22with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
23joining them by \x00, and \x00 padding shorter keys out to the length of the
24longest key. Leaf nodes are packed as densely as possible, and internal nodes
25are all an additional 8-bits wide leading to a sparse upper tree.
26
27Updates to a CHKMap are done preferentially via the apply_delta method, to
28allow optimisation of the update operation; but individual map/unmap calls are
29possible and supported. Individual changes via map/unmap are buffered in memory
30until the _save method is called to force serialisation of the tree.
31apply_delta records its changes immediately by performing an implicit _save.
32
33TODO:
34-----
35
36Densely packed upper nodes.
37
38"""
39
40import heapq
41import threading
42
43from .. import (
44    errors,
45    lru_cache,
46    osutils,
47    registry,
48    static_tuple,
49    trace,
50    )
51from ..static_tuple import StaticTuple
52
53# approx 4MB
54# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
55# out, it takes 3.1MB to cache the layer.
56_PAGE_CACHE_SIZE = 4 * 1024 * 1024
57# Per thread caches for 2 reasons:
58# - in the server we may be serving very different content, so we get less
59#   cache thrashing.
60# - we avoid locking on every cache lookup.
61_thread_caches = threading.local()
62# The page cache.
63_thread_caches.page_cache = None
64
65
66def _get_cache():
67    """Get the per-thread page cache.
68
69    We need a function to do this because in a new thread the _thread_caches
70    threading.local object does not have the cache initialized yet.
71    """
72    page_cache = getattr(_thread_caches, 'page_cache', None)
73    if page_cache is None:
74        # We are caching bytes so len(value) is perfectly accurate
75        page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
76        _thread_caches.page_cache = page_cache
77    return page_cache
78
79
80def clear_cache():
81    _get_cache().clear()
82
83
84# If a ChildNode falls below this many bytes, we check for a remap
85_INTERESTING_NEW_SIZE = 50
86# If a ChildNode shrinks by more than this amount, we check for a remap
87_INTERESTING_SHRINKAGE_LIMIT = 20
88
89
90def _search_key_plain(key):
91    """Map the key tuple into a search string that just uses the key bytes."""
92    return b'\x00'.join(key)
93
94
95search_key_registry = registry.Registry()
96search_key_registry.register(b'plain', _search_key_plain)
97
98
99class CHKMap(object):
100    """A persistent map from string to string backed by a CHK store."""
101
102    __slots__ = ('_store', '_root_node', '_search_key_func')
103
104    def __init__(self, store, root_key, search_key_func=None):
105        """Create a CHKMap object.
106
107        :param store: The store the CHKMap is stored in.
108        :param root_key: The root key of the map. None to create an empty
109            CHKMap.
110        :param search_key_func: A function mapping a key => bytes. These bytes
111            are then used by the internal nodes to split up leaf nodes into
112            multiple pages.
113        """
114        self._store = store
115        if search_key_func is None:
116            search_key_func = _search_key_plain
117        self._search_key_func = search_key_func
118        if root_key is None:
119            self._root_node = LeafNode(search_key_func=search_key_func)
120        else:
121            self._root_node = self._node_key(root_key)
122
123    def apply_delta(self, delta):
124        """Apply a delta to the map.
125
126        :param delta: An iterable of old_key, new_key, new_value tuples.
127            If new_key is not None, then new_key->new_value is inserted
128            into the map; if old_key is not None, then the old mapping
129            of old_key is removed.
130        """
131        has_deletes = False
132        # Check preconditions first.
133        as_st = StaticTuple.from_sequence
134        new_items = {as_st(key) for (old, key, value) in delta
135                     if key is not None and old is None}
136        existing_new = list(self.iteritems(key_filter=new_items))
137        if existing_new:
138            raise errors.InconsistentDeltaDelta(delta,
139                                                "New items are already in the map %r." % existing_new)
140        # Now apply changes.
141        for old, new, value in delta:
142            if old is not None and old != new:
143                self.unmap(old, check_remap=False)
144                has_deletes = True
145        for old, new, value in delta:
146            if new is not None:
147                self.map(new, value)
148        if has_deletes:
149            self._check_remap()
150        return self._save()
151
152    def _ensure_root(self):
153        """Ensure that the root node is an object not a key."""
154        if isinstance(self._root_node, StaticTuple):
155            # Demand-load the root
156            self._root_node = self._get_node(self._root_node)
157
158    def _get_node(self, node):
159        """Get a node.
160
161        Note that this does not update the _items dict in objects containing a
162        reference to this node. As such it does not prevent subsequent IO being
163        performed.
164
165        :param node: A tuple key or node object.
166        :return: A node object.
167        """
168        if isinstance(node, StaticTuple):
169            bytes = self._read_bytes(node)
170            return _deserialise(bytes, node,
171                                search_key_func=self._search_key_func)
172        else:
173            return node
174
175    def _read_bytes(self, key):
176        try:
177            return _get_cache()[key]
178        except KeyError:
179            stream = self._store.get_record_stream([key], 'unordered', True)
180            bytes = next(stream).get_bytes_as('fulltext')
181            _get_cache()[key] = bytes
182            return bytes
183
184    def _dump_tree(self, include_keys=False, encoding='utf-8'):
185        """Return the tree in a string representation."""
186        self._ensure_root()
187        def decode(x): return x.decode(encoding)
188        res = self._dump_tree_node(self._root_node, prefix=b'', indent='',
189                                   decode=decode, include_keys=include_keys)
190        res.append('')  # Give a trailing '\n'
191        return '\n'.join(res)
192
193    def _dump_tree_node(self, node, prefix, indent, decode, include_keys=True):
194        """For this node and all children, generate a string representation."""
195        result = []
196        if not include_keys:
197            key_str = ''
198        else:
199            node_key = node.key()
200            if node_key is not None:
201                key_str = ' %s' % (decode(node_key[0]),)
202            else:
203                key_str = ' None'
204        result.append('%s%r %s%s' % (indent, decode(prefix), node.__class__.__name__,
205                                     key_str))
206        if isinstance(node, InternalNode):
207            # Trigger all child nodes to get loaded
208            list(node._iter_nodes(self._store))
209            for prefix, sub in sorted(node._items.items()):
210                result.extend(self._dump_tree_node(sub, prefix, indent + '  ',
211                                                   decode=decode, include_keys=include_keys))
212        else:
213            for key, value in sorted(node._items.items()):
214                # Don't use prefix nor indent here to line up when used in
215                # tests in conjunction with assertEqualDiff
216                result.append('      %r %r' % (
217                    tuple([decode(ke) for ke in key]), decode(value)))
218        return result
219
220    @classmethod
221    def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
222                  search_key_func=None):
223        """Create a CHKMap in store with initial_value as the content.
224
225        :param store: The store to record initial_value in, a VersionedFiles
226            object with 1-tuple keys supporting CHK key generation.
227        :param initial_value: A dict to store in store. Its keys and values
228            must be bytestrings.
229        :param maximum_size: The maximum_size rule to apply to nodes. This
230            determines the size at which no new data is added to a single node.
231        :param key_width: The number of elements in each key_tuple being stored
232            in this map.
233        :param search_key_func: A function mapping a key => bytes. These bytes
234            are then used by the internal nodes to split up leaf nodes into
235            multiple pages.
236        :return: The root chk of the resulting CHKMap.
237        """
238        root_key = klass._create_directly(store, initial_value,
239                                          maximum_size=maximum_size, key_width=key_width,
240                                          search_key_func=search_key_func)
241        if not isinstance(root_key, StaticTuple):
242            raise AssertionError('we got a %s instead of a StaticTuple'
243                                 % (type(root_key),))
244        return root_key
245
246    @classmethod
247    def _create_via_map(klass, store, initial_value, maximum_size=0,
248                        key_width=1, search_key_func=None):
249        result = klass(store, None, search_key_func=search_key_func)
250        result._root_node.set_maximum_size(maximum_size)
251        result._root_node._key_width = key_width
252        delta = []
253        for key, value in initial_value.items():
254            delta.append((None, key, value))
255        root_key = result.apply_delta(delta)
256        return root_key
257
258    @classmethod
259    def _create_directly(klass, store, initial_value, maximum_size=0,
260                         key_width=1, search_key_func=None):
261        node = LeafNode(search_key_func=search_key_func)
262        node.set_maximum_size(maximum_size)
263        node._key_width = key_width
264        as_st = StaticTuple.from_sequence
265        node._items = dict((as_st(key), val)
266                           for key, val in initial_value.items())
267        node._raw_size = sum(node._key_value_len(key, value)
268                             for key, value in node._items.items())
269        node._len = len(node._items)
270        node._compute_search_prefix()
271        node._compute_serialised_prefix()
272        if (node._len > 1 and
273            maximum_size and
274                node._current_size() > maximum_size):
275            prefix, node_details = node._split(store)
276            if len(node_details) == 1:
277                raise AssertionError('Failed to split using node._split')
278            node = InternalNode(prefix, search_key_func=search_key_func)
279            node.set_maximum_size(maximum_size)
280            node._key_width = key_width
281            for split, subnode in node_details:
282                node.add_node(split, subnode)
283        keys = list(node.serialise(store))
284        return keys[-1]
285
286    def iter_changes(self, basis):
287        """Iterate over the changes between basis and self.
288
289        :return: An iterator of tuples: (key, old_value, new_value). Old_value
290            is None for keys only in self; new_value is None for keys only in
291            basis.
292        """
293        # Overview:
294        # Read both trees in lexographic, highest-first order.
295        # Any identical nodes we skip
296        # Any unique prefixes we output immediately.
297        # values in a leaf node are treated as single-value nodes in the tree
298        # which allows them to be not-special-cased. We know to output them
299        # because their value is a string, not a key(tuple) or node.
300        #
301        # corner cases to beware of when considering this function:
302        # *) common references are at different heights.
303        #    consider two trees:
304        #    {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
305        #    {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
306        #                        'ab':LeafNode={'ab':'bar'}}
307        #     'b': LeafNode={'b'}}
308        #    the node with aaa/aab will only be encountered in the second tree
309        #    after reading the 'a' subtree, but it is encountered in the first
310        #    tree immediately. Variations on this may have read internal nodes
311        #    like this.  we want to cut the entire pending subtree when we
312        #    realise we have a common node.  For this we use a list of keys -
313        #    the path to a node - and check the entire path is clean as we
314        #    process each item.
315        if self._node_key(self._root_node) == self._node_key(basis._root_node):
316            return
317        self._ensure_root()
318        basis._ensure_root()
319        excluded_keys = set()
320        self_node = self._root_node
321        basis_node = basis._root_node
322        # A heap, each element is prefix, node(tuple/NodeObject/string),
323        # key_path (a list of tuples, tail-sharing down the tree.)
324        self_pending = []
325        basis_pending = []
326
327        def process_node(node, path, a_map, pending):
328            # take a node and expand it
329            node = a_map._get_node(node)
330            if isinstance(node, LeafNode):
331                path = (node._key, path)
332                for key, value in node._items.items():
333                    # For a LeafNode, the key is a serialized_key, rather than
334                    # a search_key, but the heap is using search_keys
335                    search_key = node._search_key_func(key)
336                    heapq.heappush(pending, (search_key, key, value, path))
337            else:
338                # type(node) == InternalNode
339                path = (node._key, path)
340                for prefix, child in node._items.items():
341                    heapq.heappush(pending, (prefix, None, child, path))
342
343        def process_common_internal_nodes(self_node, basis_node):
344            self_items = set(self_node._items.items())
345            basis_items = set(basis_node._items.items())
346            path = (self_node._key, None)
347            for prefix, child in self_items - basis_items:
348                heapq.heappush(self_pending, (prefix, None, child, path))
349            path = (basis_node._key, None)
350            for prefix, child in basis_items - self_items:
351                heapq.heappush(basis_pending, (prefix, None, child, path))
352
353        def process_common_leaf_nodes(self_node, basis_node):
354            self_items = set(self_node._items.items())
355            basis_items = set(basis_node._items.items())
356            path = (self_node._key, None)
357            for key, value in self_items - basis_items:
358                prefix = self._search_key_func(key)
359                heapq.heappush(self_pending, (prefix, key, value, path))
360            path = (basis_node._key, None)
361            for key, value in basis_items - self_items:
362                prefix = basis._search_key_func(key)
363                heapq.heappush(basis_pending, (prefix, key, value, path))
364
365        def process_common_prefix_nodes(self_node, self_path,
366                                        basis_node, basis_path):
367            # Would it be more efficient if we could request both at the same
368            # time?
369            self_node = self._get_node(self_node)
370            basis_node = basis._get_node(basis_node)
371            if (isinstance(self_node, InternalNode) and
372                    isinstance(basis_node, InternalNode)):
373                # Matching internal nodes
374                process_common_internal_nodes(self_node, basis_node)
375            elif (isinstance(self_node, LeafNode) and
376                  isinstance(basis_node, LeafNode)):
377                process_common_leaf_nodes(self_node, basis_node)
378            else:
379                process_node(self_node, self_path, self, self_pending)
380                process_node(basis_node, basis_path, basis, basis_pending)
381        process_common_prefix_nodes(self_node, None, basis_node, None)
382        self_seen = set()
383        basis_seen = set()
384        excluded_keys = set()
385
386        def check_excluded(key_path):
387            # Note that this is N^2, it depends on us trimming trees
388            # aggressively to not become slow.
389            # A better implementation would probably have a reverse map
390            # back to the children of a node, and jump straight to it when
391            # a common node is detected, the proceed to remove the already
392            # pending children. breezy.graph has a searcher module with a
393            # similar problem.
394            while key_path is not None:
395                key, key_path = key_path
396                if key in excluded_keys:
397                    return True
398            return False
399
400        loop_counter = 0
401        while self_pending or basis_pending:
402            loop_counter += 1
403            if not self_pending:
404                # self is exhausted: output remainder of basis
405                for prefix, key, node, path in basis_pending:
406                    if check_excluded(path):
407                        continue
408                    node = basis._get_node(node)
409                    if key is not None:
410                        # a value
411                        yield (key, node, None)
412                    else:
413                        # subtree - fastpath the entire thing.
414                        for key, value in node.iteritems(basis._store):
415                            yield (key, value, None)
416                return
417            elif not basis_pending:
418                # basis is exhausted: output remainder of self.
419                for prefix, key, node, path in self_pending:
420                    if check_excluded(path):
421                        continue
422                    node = self._get_node(node)
423                    if key is not None:
424                        # a value
425                        yield (key, None, node)
426                    else:
427                        # subtree - fastpath the entire thing.
428                        for key, value in node.iteritems(self._store):
429                            yield (key, None, value)
430                return
431            else:
432                # XXX: future optimisation - yield the smaller items
433                # immediately rather than pushing everything on/off the
434                # heaps. Applies to both internal nodes and leafnodes.
435                if self_pending[0][0] < basis_pending[0][0]:
436                    # expand self
437                    prefix, key, node, path = heapq.heappop(self_pending)
438                    if check_excluded(path):
439                        continue
440                    if key is not None:
441                        # a value
442                        yield (key, None, node)
443                    else:
444                        process_node(node, path, self, self_pending)
445                        continue
446                elif self_pending[0][0] > basis_pending[0][0]:
447                    # expand basis
448                    prefix, key, node, path = heapq.heappop(basis_pending)
449                    if check_excluded(path):
450                        continue
451                    if key is not None:
452                        # a value
453                        yield (key, node, None)
454                    else:
455                        process_node(node, path, basis, basis_pending)
456                        continue
457                else:
458                    # common prefix: possibly expand both
459                    if self_pending[0][1] is None:
460                        # process next self
461                        read_self = True
462                    else:
463                        read_self = False
464                    if basis_pending[0][1] is None:
465                        # process next basis
466                        read_basis = True
467                    else:
468                        read_basis = False
469                    if not read_self and not read_basis:
470                        # compare a common value
471                        self_details = heapq.heappop(self_pending)
472                        basis_details = heapq.heappop(basis_pending)
473                        if self_details[2] != basis_details[2]:
474                            yield (self_details[1],
475                                   basis_details[2], self_details[2])
476                        continue
477                    # At least one side wasn't a simple value
478                    if (self._node_key(self_pending[0][2])
479                            == self._node_key(basis_pending[0][2])):
480                        # Identical pointers, skip (and don't bother adding to
481                        # excluded, it won't turn up again.
482                        heapq.heappop(self_pending)
483                        heapq.heappop(basis_pending)
484                        continue
485                    # Now we need to expand this node before we can continue
486                    if read_self and read_basis:
487                        # Both sides start with the same prefix, so process
488                        # them in parallel
489                        self_prefix, _, self_node, self_path = heapq.heappop(
490                            self_pending)
491                        basis_prefix, _, basis_node, basis_path = heapq.heappop(
492                            basis_pending)
493                        if self_prefix != basis_prefix:
494                            raise AssertionError(
495                                '%r != %r' % (self_prefix, basis_prefix))
496                        process_common_prefix_nodes(
497                            self_node, self_path,
498                            basis_node, basis_path)
499                        continue
500                    if read_self:
501                        prefix, key, node, path = heapq.heappop(self_pending)
502                        if check_excluded(path):
503                            continue
504                        process_node(node, path, self, self_pending)
505                    if read_basis:
506                        prefix, key, node, path = heapq.heappop(basis_pending)
507                        if check_excluded(path):
508                            continue
509                        process_node(node, path, basis, basis_pending)
510        # print loop_counter
511
512    def iteritems(self, key_filter=None):
513        """Iterate over the entire CHKMap's contents."""
514        self._ensure_root()
515        if key_filter is not None:
516            as_st = StaticTuple.from_sequence
517            key_filter = [as_st(key) for key in key_filter]
518        return self._root_node.iteritems(self._store, key_filter=key_filter)
519
520    def key(self):
521        """Return the key for this map."""
522        if isinstance(self._root_node, StaticTuple):
523            return self._root_node
524        else:
525            return self._root_node._key
526
527    def __len__(self):
528        self._ensure_root()
529        return len(self._root_node)
530
531    def map(self, key, value):
532        """Map a key tuple to value.
533
534        :param key: A key to map.
535        :param value: The value to assign to key.
536        """
537        key = StaticTuple.from_sequence(key)
538        # Need a root object.
539        self._ensure_root()
540        prefix, node_details = self._root_node.map(self._store, key, value)
541        if len(node_details) == 1:
542            self._root_node = node_details[0][1]
543        else:
544            self._root_node = InternalNode(prefix,
545                                           search_key_func=self._search_key_func)
546            self._root_node.set_maximum_size(node_details[0][1].maximum_size)
547            self._root_node._key_width = node_details[0][1]._key_width
548            for split, node in node_details:
549                self._root_node.add_node(split, node)
550
551    def _node_key(self, node):
552        """Get the key for a node whether it's a tuple or node."""
553        if isinstance(node, tuple):
554            node = StaticTuple.from_sequence(node)
555        if isinstance(node, StaticTuple):
556            return node
557        else:
558            return node._key
559
560    def unmap(self, key, check_remap=True):
561        """remove key from the map."""
562        key = StaticTuple.from_sequence(key)
563        self._ensure_root()
564        if isinstance(self._root_node, InternalNode):
565            unmapped = self._root_node.unmap(self._store, key,
566                                             check_remap=check_remap)
567        else:
568            unmapped = self._root_node.unmap(self._store, key)
569        self._root_node = unmapped
570
571    def _check_remap(self):
572        """Check if nodes can be collapsed."""
573        self._ensure_root()
574        if isinstance(self._root_node, InternalNode):
575            self._root_node = self._root_node._check_remap(self._store)
576
577    def _save(self):
578        """Save the map completely.
579
580        :return: The key of the root node.
581        """
582        if isinstance(self._root_node, StaticTuple):
583            # Already saved.
584            return self._root_node
585        keys = list(self._root_node.serialise(self._store))
586        return keys[-1]
587
588
589class Node(object):
590    """Base class defining the protocol for CHK Map nodes.
591
592    :ivar _raw_size: The total size of the serialized key:value data, before
593        adding the header bytes, and without prefix compression.
594    """
595
596    __slots__ = ('_key', '_len', '_maximum_size', '_key_width',
597                 '_raw_size', '_items', '_search_prefix', '_search_key_func'
598                 )
599
600    def __init__(self, key_width=1):
601        """Create a node.
602
603        :param key_width: The width of keys for this node.
604        """
605        self._key = None
606        # Current number of elements
607        self._len = 0
608        self._maximum_size = 0
609        self._key_width = key_width
610        # current size in bytes
611        self._raw_size = 0
612        # The pointers/values this node has - meaning defined by child classes.
613        self._items = {}
614        # The common search prefix
615        self._search_prefix = None
616
617    def __repr__(self):
618        items_str = str(sorted(self._items))
619        if len(items_str) > 20:
620            items_str = items_str[:16] + '...]'
621        return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
622            self.__class__.__name__, self._key, self._len, self._raw_size,
623            self._maximum_size, self._search_prefix, items_str)
624
625    def key(self):
626        return self._key
627
628    def __len__(self):
629        return self._len
630
631    @property
632    def maximum_size(self):
633        """What is the upper limit for adding references to a node."""
634        return self._maximum_size
635
636    def set_maximum_size(self, new_size):
637        """Set the size threshold for nodes.
638
639        :param new_size: The size at which no data is added to a node. 0 for
640            unlimited.
641        """
642        self._maximum_size = new_size
643
644    @classmethod
645    def common_prefix(cls, prefix, key):
646        """Given 2 strings, return the longest prefix common to both.
647
648        :param prefix: This has been the common prefix for other keys, so it is
649            more likely to be the common prefix in this case as well.
650        :param key: Another string to compare to
651        """
652        if key.startswith(prefix):
653            return prefix
654        pos = -1
655        # Is there a better way to do this?
656        for pos, (left, right) in enumerate(zip(prefix, key)):
657            if left != right:
658                pos -= 1
659                break
660        common = prefix[:pos + 1]
661        return common
662
663    @classmethod
664    def common_prefix_for_keys(cls, keys):
665        """Given a list of keys, find their common prefix.
666
667        :param keys: An iterable of strings.
668        :return: The longest common prefix of all keys.
669        """
670        common_prefix = None
671        for key in keys:
672            if common_prefix is None:
673                common_prefix = key
674                continue
675            common_prefix = cls.common_prefix(common_prefix, key)
676            if not common_prefix:
677                # if common_prefix is the empty string, then we know it won't
678                # change further
679                return b''
680        return common_prefix
681
682
683# Singleton indicating we have not computed _search_prefix yet
684_unknown = object()
685
686
687class LeafNode(Node):
688    """A node containing actual key:value pairs.
689
690    :ivar _items: A dict of key->value items. The key is in tuple form.
691    :ivar _size: The number of bytes that would be used by serializing all of
692        the key/value pairs.
693    """
694
695    __slots__ = ('_common_serialised_prefix',)
696
697    def __init__(self, search_key_func=None):
698        Node.__init__(self)
699        # All of the keys in this leaf node share this common prefix
700        self._common_serialised_prefix = None
701        if search_key_func is None:
702            self._search_key_func = _search_key_plain
703        else:
704            self._search_key_func = search_key_func
705
706    def __repr__(self):
707        items_str = str(sorted(self._items))
708        if len(items_str) > 20:
709            items_str = items_str[:16] + '...]'
710        return \
711            '%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
712            % (self.__class__.__name__, self._key, self._len, self._raw_size,
713               self._maximum_size, self._search_prefix, self._key_width, items_str)
714
715    def _current_size(self):
716        """Answer the current serialised size of this node.
717
718        This differs from self._raw_size in that it includes the bytes used for
719        the header.
720        """
721        if self._common_serialised_prefix is None:
722            bytes_for_items = 0
723            prefix_len = 0
724        else:
725            # We will store a single string with the common prefix
726            # And then that common prefix will not be stored in any of the
727            # entry lines
728            prefix_len = len(self._common_serialised_prefix)
729            bytes_for_items = (self._raw_size - (prefix_len * self._len))
730        return (9 +  # 'chkleaf:\n' +
731                len(str(self._maximum_size)) + 1 +
732                len(str(self._key_width)) + 1 +
733                len(str(self._len)) + 1 +
734                prefix_len + 1 +
735                bytes_for_items)
736
737    @classmethod
738    def deserialise(klass, bytes, key, search_key_func=None):
739        """Deserialise bytes, with key key, into a LeafNode.
740
741        :param bytes: The bytes of the node.
742        :param key: The key that the serialised node has.
743        """
744        key = static_tuple.expect_static_tuple(key)
745        return _deserialise_leaf_node(bytes, key,
746                                      search_key_func=search_key_func)
747
748    def iteritems(self, store, key_filter=None):
749        """Iterate over items in the node.
750
751        :param key_filter: A filter to apply to the node. It should be a
752            list/set/dict or similar repeatedly iterable container.
753        """
754        if key_filter is not None:
755            # Adjust the filter - short elements go to a prefix filter. All
756            # other items are looked up directly.
757            # XXX: perhaps defaultdict? Profiling<rinse and repeat>
758            filters = {}
759            for key in key_filter:
760                if len(key) == self._key_width:
761                    # This filter is meant to match exactly one key, yield it
762                    # if we have it.
763                    try:
764                        yield key, self._items[key]
765                    except KeyError:
766                        # This key is not present in this map, continue
767                        pass
768                else:
769                    # Short items, we need to match based on a prefix
770                    filters.setdefault(len(key), set()).add(key)
771            if filters:
772                filters_itemview = filters.items()
773                for item in self._items.items():
774                    for length, length_filter in filters_itemview:
775                        if item[0][:length] in length_filter:
776                            yield item
777                            break
778        else:
779            yield from self._items.items()
780
781    def _key_value_len(self, key, value):
782        # TODO: Should probably be done without actually joining the key, but
783        #       then that can be done via the C extension
784        return (len(self._serialise_key(key)) + 1 +
785                len(b'%d' % value.count(b'\n')) + 1 +
786                len(value) + 1)
787
788    def _search_key(self, key):
789        return self._search_key_func(key)
790
791    def _map_no_split(self, key, value):
792        """Map a key to a value.
793
794        This assumes either the key does not already exist, or you have already
795        removed its size and length from self.
796
797        :return: True if adding this node should cause us to split.
798        """
799        self._items[key] = value
800        self._raw_size += self._key_value_len(key, value)
801        self._len += 1
802        serialised_key = self._serialise_key(key)
803        if self._common_serialised_prefix is None:
804            self._common_serialised_prefix = serialised_key
805        else:
806            self._common_serialised_prefix = self.common_prefix(
807                self._common_serialised_prefix, serialised_key)
808        search_key = self._search_key(key)
809        if self._search_prefix is _unknown:
810            self._compute_search_prefix()
811        if self._search_prefix is None:
812            self._search_prefix = search_key
813        else:
814            self._search_prefix = self.common_prefix(
815                self._search_prefix, search_key)
816        if (self._len > 1 and
817            self._maximum_size and
818                self._current_size() > self._maximum_size):
819            # Check to see if all of the search_keys for this node are
820            # identical. We allow the node to grow under that circumstance
821            # (we could track this as common state, but it is infrequent)
822            if (search_key != self._search_prefix or
823                    not self._are_search_keys_identical()):
824                return True
825        return False
826
827    def _split(self, store):
828        """We have overflowed.
829
830        Split this node into multiple LeafNodes, return it up the stack so that
831        the next layer creates a new InternalNode and references the new nodes.
832
833        :return: (common_serialised_prefix, [(node_serialised_prefix, node)])
834        """
835        if self._search_prefix is _unknown:
836            raise AssertionError('Search prefix must be known')
837        common_prefix = self._search_prefix
838        split_at = len(common_prefix) + 1
839        result = {}
840        for key, value in self._items.items():
841            search_key = self._search_key(key)
842            prefix = search_key[:split_at]
843            # TODO: Generally only 1 key can be exactly the right length,
844            #       which means we can only have 1 key in the node pointed
845            #       at by the 'prefix\0' key. We might want to consider
846            #       folding it into the containing InternalNode rather than
847            #       having a fixed length-1 node.
848            #       Note this is probably not true for hash keys, as they
849            #       may get a '\00' node anywhere, but won't have keys of
850            #       different lengths.
851            if len(prefix) < split_at:
852                prefix += b'\x00' * (split_at - len(prefix))
853            if prefix not in result:
854                node = LeafNode(search_key_func=self._search_key_func)
855                node.set_maximum_size(self._maximum_size)
856                node._key_width = self._key_width
857                result[prefix] = node
858            else:
859                node = result[prefix]
860            sub_prefix, node_details = node.map(store, key, value)
861            if len(node_details) > 1:
862                if prefix != sub_prefix:
863                    # This node has been split and is now found via a different
864                    # path
865                    result.pop(prefix)
866                new_node = InternalNode(sub_prefix,
867                                        search_key_func=self._search_key_func)
868                new_node.set_maximum_size(self._maximum_size)
869                new_node._key_width = self._key_width
870                for split, node in node_details:
871                    new_node.add_node(split, node)
872                result[prefix] = new_node
873        return common_prefix, list(result.items())
874
875    def map(self, store, key, value):
876        """Map key to value."""
877        if key in self._items:
878            self._raw_size -= self._key_value_len(key, self._items[key])
879            self._len -= 1
880        self._key = None
881        if self._map_no_split(key, value):
882            return self._split(store)
883        else:
884            if self._search_prefix is _unknown:
885                raise AssertionError('%r must be known' % self._search_prefix)
886            return self._search_prefix, [(b"", self)]
887
888    _serialise_key = b'\x00'.join
889
890    def serialise(self, store):
891        """Serialise the LeafNode to store.
892
893        :param store: A VersionedFiles honouring the CHK extensions.
894        :return: An iterable of the keys inserted by this operation.
895        """
896        lines = [b"chkleaf:\n"]
897        lines.append(b"%d\n" % self._maximum_size)
898        lines.append(b"%d\n" % self._key_width)
899        lines.append(b"%d\n" % self._len)
900        if self._common_serialised_prefix is None:
901            lines.append(b'\n')
902            if len(self._items) != 0:
903                raise AssertionError('If _common_serialised_prefix is None'
904                                     ' we should have no items')
905        else:
906            lines.append(b'%s\n' % (self._common_serialised_prefix,))
907            prefix_len = len(self._common_serialised_prefix)
908        for key, value in sorted(self._items.items()):
909            # Always add a final newline
910            value_lines = osutils.chunks_to_lines([value + b'\n'])
911            serialized = b"%s\x00%d\n" % (self._serialise_key(key),
912                                          len(value_lines))
913            if not serialized.startswith(self._common_serialised_prefix):
914                raise AssertionError('We thought the common prefix was %r'
915                                     ' but entry %r does not have it in common'
916                                     % (self._common_serialised_prefix, serialized))
917            lines.append(serialized[prefix_len:])
918            lines.extend(value_lines)
919        sha1, _, _ = store.add_lines((None,), (), lines)
920        self._key = StaticTuple(b"sha1:" + sha1,).intern()
921        data = b''.join(lines)
922        if len(data) != self._current_size():
923            raise AssertionError('Invalid _current_size')
924        _get_cache()[self._key] = data
925        return [self._key]
926
927    def refs(self):
928        """Return the references to other CHK's held by this node."""
929        return []
930
931    def _compute_search_prefix(self):
932        """Determine the common search prefix for all keys in this node.
933
934        :return: A bytestring of the longest search key prefix that is
935            unique within this node.
936        """
937        search_keys = [self._search_key_func(key) for key in self._items]
938        self._search_prefix = self.common_prefix_for_keys(search_keys)
939        return self._search_prefix
940
941    def _are_search_keys_identical(self):
942        """Check to see if the search keys for all entries are the same.
943
944        When using a hash as the search_key it is possible for non-identical
945        keys to collide. If that happens enough, we may try overflow a
946        LeafNode, but as all are collisions, we must not split.
947        """
948        common_search_key = None
949        for key in self._items:
950            search_key = self._search_key(key)
951            if common_search_key is None:
952                common_search_key = search_key
953            elif search_key != common_search_key:
954                return False
955        return True
956
957    def _compute_serialised_prefix(self):
958        """Determine the common prefix for serialised keys in this node.
959
960        :return: A bytestring of the longest serialised key prefix that is
961            unique within this node.
962        """
963        serialised_keys = [self._serialise_key(key) for key in self._items]
964        self._common_serialised_prefix = self.common_prefix_for_keys(
965            serialised_keys)
966        return self._common_serialised_prefix
967
968    def unmap(self, store, key):
969        """Unmap key from the node."""
970        try:
971            self._raw_size -= self._key_value_len(key, self._items[key])
972        except KeyError:
973            trace.mutter("key %s not found in %r", key, self._items)
974            raise
975        self._len -= 1
976        del self._items[key]
977        self._key = None
978        # Recompute from scratch
979        self._compute_search_prefix()
980        self._compute_serialised_prefix()
981        return self
982
983
984class InternalNode(Node):
985    """A node that contains references to other nodes.
986
987    An InternalNode is responsible for mapping search key prefixes to child
988    nodes.
989
990    :ivar _items: serialised_key => node dictionary. node may be a tuple,
991        LeafNode or InternalNode.
992    """
993
994    __slots__ = ('_node_width',)
995
996    def __init__(self, prefix=b'', search_key_func=None):
997        Node.__init__(self)
998        # The size of an internalnode with default values and no children.
999        # How many octets key prefixes within this node are.
1000        self._node_width = 0
1001        self._search_prefix = prefix
1002        if search_key_func is None:
1003            self._search_key_func = _search_key_plain
1004        else:
1005            self._search_key_func = search_key_func
1006
1007    def add_node(self, prefix, node):
1008        """Add a child node with prefix prefix, and node node.
1009
1010        :param prefix: The search key prefix for node.
1011        :param node: The node being added.
1012        """
1013        if self._search_prefix is None:
1014            raise AssertionError("_search_prefix should not be None")
1015        if not prefix.startswith(self._search_prefix):
1016            raise AssertionError("prefixes mismatch: %s must start with %s"
1017                                 % (prefix, self._search_prefix))
1018        if len(prefix) != len(self._search_prefix) + 1:
1019            raise AssertionError("prefix wrong length: len(%s) is not %d" %
1020                                 (prefix, len(self._search_prefix) + 1))
1021        self._len += len(node)
1022        if not len(self._items):
1023            self._node_width = len(prefix)
1024        if self._node_width != len(self._search_prefix) + 1:
1025            raise AssertionError("node width mismatch: %d is not %d" %
1026                                 (self._node_width, len(self._search_prefix) + 1))
1027        self._items[prefix] = node
1028        self._key = None
1029
1030    def _current_size(self):
1031        """Answer the current serialised size of this node."""
1032        return (self._raw_size + len(str(self._len)) + len(str(self._key_width))
1033                + len(str(self._maximum_size)))
1034
1035    @classmethod
1036    def deserialise(klass, bytes, key, search_key_func=None):
1037        """Deserialise bytes to an InternalNode, with key key.
1038
1039        :param bytes: The bytes of the node.
1040        :param key: The key that the serialised node has.
1041        :return: An InternalNode instance.
1042        """
1043        key = static_tuple.expect_static_tuple(key)
1044        return _deserialise_internal_node(bytes, key,
1045                                          search_key_func=search_key_func)
1046
1047    def iteritems(self, store, key_filter=None):
1048        for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
1049            for item in node.iteritems(store, key_filter=node_filter):
1050                yield item
1051
1052    def _iter_nodes(self, store, key_filter=None, batch_size=None):
1053        """Iterate over node objects which match key_filter.
1054
1055        :param store: A store to use for accessing content.
1056        :param key_filter: A key filter to filter nodes. Only nodes that might
1057            contain a key in key_filter will be returned.
1058        :param batch_size: If not None, then we will return the nodes that had
1059            to be read using get_record_stream in batches, rather than reading
1060            them all at once.
1061        :return: An iterable of nodes. This function does not have to be fully
1062            consumed.  (There will be no pending I/O when items are being returned.)
1063        """
1064        # Map from chk key ('sha1:...',) to (prefix, key_filter)
1065        # prefix is the key in self._items to use, key_filter is the key_filter
1066        # entries that would match this node
1067        keys = {}
1068        shortcut = False
1069        if key_filter is None:
1070            # yielding all nodes, yield whatever we have, and queue up a read
1071            # for whatever we are missing
1072            shortcut = True
1073            for prefix, node in self._items.items():
1074                if node.__class__ is StaticTuple:
1075                    keys[node] = (prefix, None)
1076                else:
1077                    yield node, None
1078        elif len(key_filter) == 1:
1079            # Technically, this path could also be handled by the first check
1080            # in 'self._node_width' in length_filters. However, we can handle
1081            # this case without spending any time building up the
1082            # prefix_to_keys, etc state.
1083
1084            # This is a bit ugly, but TIMEIT showed it to be by far the fastest
1085            # 0.626us   list(key_filter)[0]
1086            #       is a func() for list(), 2 mallocs, and a getitem
1087            # 0.489us   [k for k in key_filter][0]
1088            #       still has the mallocs, avoids the func() call
1089            # 0.350us   iter(key_filter).next()
1090            #       has a func() call, and mallocs an iterator
1091            # 0.125us   for key in key_filter: pass
1092            #       no func() overhead, might malloc an iterator
1093            # 0.105us   for key in key_filter: break
1094            #       no func() overhead, might malloc an iterator, probably
1095            #       avoids checking an 'else' clause as part of the for
1096            for key in key_filter:
1097                break
1098            search_prefix = self._search_prefix_filter(key)
1099            if len(search_prefix) == self._node_width:
1100                # This item will match exactly, so just do a dict lookup, and
1101                # see what we can return
1102                shortcut = True
1103                try:
1104                    node = self._items[search_prefix]
1105                except KeyError:
1106                    # A given key can only match 1 child node, if it isn't
1107                    # there, then we can just return nothing
1108                    return
1109                if node.__class__ is StaticTuple:
1110                    keys[node] = (search_prefix, [key])
1111                else:
1112                    # This is loaded, and the only thing that can match,
1113                    # return
1114                    yield node, [key]
1115                    return
1116        if not shortcut:
1117            # First, convert all keys into a list of search prefixes
1118            # Aggregate common prefixes, and track the keys they come from
1119            prefix_to_keys = {}
1120            length_filters = {}
1121            for key in key_filter:
1122                search_prefix = self._search_prefix_filter(key)
1123                length_filter = length_filters.setdefault(
1124                    len(search_prefix), set())
1125                length_filter.add(search_prefix)
1126                prefix_to_keys.setdefault(search_prefix, []).append(key)
1127
1128            if (self._node_width in length_filters and
1129                    len(length_filters) == 1):
1130                # all of the search prefixes match exactly _node_width. This
1131                # means that everything is an exact match, and we can do a
1132                # lookup into self._items, rather than iterating over the items
1133                # dict.
1134                search_prefixes = length_filters[self._node_width]
1135                for search_prefix in search_prefixes:
1136                    try:
1137                        node = self._items[search_prefix]
1138                    except KeyError:
1139                        # We can ignore this one
1140                        continue
1141                    node_key_filter = prefix_to_keys[search_prefix]
1142                    if node.__class__ is StaticTuple:
1143                        keys[node] = (search_prefix, node_key_filter)
1144                    else:
1145                        yield node, node_key_filter
1146            else:
1147                # The slow way. We walk every item in self._items, and check to
1148                # see if there are any matches
1149                length_filters_itemview = length_filters.items()
1150                for prefix, node in self._items.items():
1151                    node_key_filter = []
1152                    for length, length_filter in length_filters_itemview:
1153                        sub_prefix = prefix[:length]
1154                        if sub_prefix in length_filter:
1155                            node_key_filter.extend(prefix_to_keys[sub_prefix])
1156                    if node_key_filter:  # this key matched something, yield it
1157                        if node.__class__ is StaticTuple:
1158                            keys[node] = (prefix, node_key_filter)
1159                        else:
1160                            yield node, node_key_filter
1161        if keys:
1162            # Look in the page cache for some more bytes
1163            found_keys = set()
1164            for key in keys:
1165                try:
1166                    bytes = _get_cache()[key]
1167                except KeyError:
1168                    continue
1169                else:
1170                    node = _deserialise(bytes, key,
1171                                        search_key_func=self._search_key_func)
1172                    prefix, node_key_filter = keys[key]
1173                    self._items[prefix] = node
1174                    found_keys.add(key)
1175                    yield node, node_key_filter
1176            for key in found_keys:
1177                del keys[key]
1178        if keys:
1179            # demand load some pages.
1180            if batch_size is None:
1181                # Read all the keys in
1182                batch_size = len(keys)
1183            key_order = list(keys)
1184            for batch_start in range(0, len(key_order), batch_size):
1185                batch = key_order[batch_start:batch_start + batch_size]
1186                # We have to fully consume the stream so there is no pending
1187                # I/O, so we buffer the nodes for now.
1188                stream = store.get_record_stream(batch, 'unordered', True)
1189                node_and_filters = []
1190                for record in stream:
1191                    bytes = record.get_bytes_as('fulltext')
1192                    node = _deserialise(bytes, record.key,
1193                                        search_key_func=self._search_key_func)
1194                    prefix, node_key_filter = keys[record.key]
1195                    node_and_filters.append((node, node_key_filter))
1196                    self._items[prefix] = node
1197                    _get_cache()[record.key] = bytes
1198                for info in node_and_filters:
1199                    yield info
1200
1201    def map(self, store, key, value):
1202        """Map key to value."""
1203        if not len(self._items):
1204            raise AssertionError("can't map in an empty InternalNode.")
1205        search_key = self._search_key(key)
1206        if self._node_width != len(self._search_prefix) + 1:
1207            raise AssertionError("node width mismatch: %d is not %d" %
1208                                 (self._node_width, len(self._search_prefix) + 1))
1209        if not search_key.startswith(self._search_prefix):
1210            # This key doesn't fit in this index, so we need to split at the
1211            # point where it would fit, insert self into that internal node,
1212            # and then map this key into that node.
1213            new_prefix = self.common_prefix(self._search_prefix,
1214                                            search_key)
1215            new_parent = InternalNode(new_prefix,
1216                                      search_key_func=self._search_key_func)
1217            new_parent.set_maximum_size(self._maximum_size)
1218            new_parent._key_width = self._key_width
1219            new_parent.add_node(self._search_prefix[:len(new_prefix) + 1],
1220                                self)
1221            return new_parent.map(store, key, value)
1222        children = [node for node, _ in self._iter_nodes(
1223            store, key_filter=[key])]
1224        if children:
1225            child = children[0]
1226        else:
1227            # new child needed:
1228            child = self._new_child(search_key, LeafNode)
1229        old_len = len(child)
1230        if isinstance(child, LeafNode):
1231            old_size = child._current_size()
1232        else:
1233            old_size = None
1234        prefix, node_details = child.map(store, key, value)
1235        if len(node_details) == 1:
1236            # child may have shrunk, or might be a new node
1237            child = node_details[0][1]
1238            self._len = self._len - old_len + len(child)
1239            self._items[search_key] = child
1240            self._key = None
1241            new_node = self
1242            if isinstance(child, LeafNode):
1243                if old_size is None:
1244                    # The old node was an InternalNode which means it has now
1245                    # collapsed, so we need to check if it will chain to a
1246                    # collapse at this level.
1247                    trace.mutter("checking remap as InternalNode -> LeafNode")
1248                    new_node = self._check_remap(store)
1249                else:
1250                    # If the LeafNode has shrunk in size, we may want to run
1251                    # a remap check. Checking for a remap is expensive though
1252                    # and the frequency of a successful remap is very low.
1253                    # Shrinkage by small amounts is common, so we only do the
1254                    # remap check if the new_size is low or the shrinkage
1255                    # amount is over a configurable limit.
1256                    new_size = child._current_size()
1257                    shrinkage = old_size - new_size
1258                    if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE or
1259                            shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1260                        trace.mutter(
1261                            "checking remap as size shrunk by %d to be %d",
1262                            shrinkage, new_size)
1263                        new_node = self._check_remap(store)
1264            if new_node._search_prefix is None:
1265                raise AssertionError("_search_prefix should not be None")
1266            return new_node._search_prefix, [(b'', new_node)]
1267        # child has overflown - create a new intermediate node.
1268        # XXX: This is where we might want to try and expand our depth
1269        # to refer to more bytes of every child (which would give us
1270        # multiple pointers to child nodes, but less intermediate nodes)
1271        child = self._new_child(search_key, InternalNode)
1272        child._search_prefix = prefix
1273        for split, node in node_details:
1274            child.add_node(split, node)
1275        self._len = self._len - old_len + len(child)
1276        self._key = None
1277        return self._search_prefix, [(b"", self)]
1278
1279    def _new_child(self, search_key, klass):
1280        """Create a new child node of type klass."""
1281        child = klass()
1282        child.set_maximum_size(self._maximum_size)
1283        child._key_width = self._key_width
1284        child._search_key_func = self._search_key_func
1285        self._items[search_key] = child
1286        return child
1287
1288    def serialise(self, store):
1289        """Serialise the node to store.
1290
1291        :param store: A VersionedFiles honouring the CHK extensions.
1292        :return: An iterable of the keys inserted by this operation.
1293        """
1294        for node in self._items.values():
1295            if isinstance(node, StaticTuple):
1296                # Never deserialised.
1297                continue
1298            if node._key is not None:
1299                # Never altered
1300                continue
1301            for key in node.serialise(store):
1302                yield key
1303        lines = [b"chknode:\n"]
1304        lines.append(b"%d\n" % self._maximum_size)
1305        lines.append(b"%d\n" % self._key_width)
1306        lines.append(b"%d\n" % self._len)
1307        if self._search_prefix is None:
1308            raise AssertionError("_search_prefix should not be None")
1309        lines.append(b'%s\n' % (self._search_prefix,))
1310        prefix_len = len(self._search_prefix)
1311        for prefix, node in sorted(self._items.items()):
1312            if isinstance(node, StaticTuple):
1313                key = node[0]
1314            else:
1315                key = node._key[0]
1316            serialised = b"%s\x00%s\n" % (prefix, key)
1317            if not serialised.startswith(self._search_prefix):
1318                raise AssertionError("prefixes mismatch: %s must start with %s"
1319                                     % (serialised, self._search_prefix))
1320            lines.append(serialised[prefix_len:])
1321        sha1, _, _ = store.add_lines((None,), (), lines)
1322        self._key = StaticTuple(b"sha1:" + sha1,).intern()
1323        _get_cache()[self._key] = b''.join(lines)
1324        yield self._key
1325
1326    def _search_key(self, key):
1327        """Return the serialised key for key in this node."""
1328        # search keys are fixed width. All will be self._node_width wide, so we
1329        # pad as necessary.
1330        return (self._search_key_func(key) + b'\x00' * self._node_width)[:self._node_width]
1331
1332    def _search_prefix_filter(self, key):
1333        """Serialise key for use as a prefix filter in iteritems."""
1334        return self._search_key_func(key)[:self._node_width]
1335
1336    def _split(self, offset):
1337        """Split this node into smaller nodes starting at offset.
1338
1339        :param offset: The offset to start the new child nodes at.
1340        :return: An iterable of (prefix, node) tuples. prefix is a byte
1341            prefix for reaching node.
1342        """
1343        if offset >= self._node_width:
1344            for node in valueview(self._items):
1345                for result in node._split(offset):
1346                    yield result
1347
1348    def refs(self):
1349        """Return the references to other CHK's held by this node."""
1350        if self._key is None:
1351            raise AssertionError("unserialised nodes have no refs.")
1352        refs = []
1353        for value in self._items.values():
1354            if isinstance(value, StaticTuple):
1355                refs.append(value)
1356            else:
1357                refs.append(value.key())
1358        return refs
1359
1360    def _compute_search_prefix(self, extra_key=None):
1361        """Return the unique key prefix for this node.
1362
1363        :return: A bytestring of the longest search key prefix that is
1364            unique within this node.
1365        """
1366        self._search_prefix = self.common_prefix_for_keys(self._items)
1367        return self._search_prefix
1368
1369    def unmap(self, store, key, check_remap=True):
1370        """Remove key from this node and its children."""
1371        if not len(self._items):
1372            raise AssertionError("can't unmap in an empty InternalNode.")
1373        children = [node for node, _
1374                    in self._iter_nodes(store, key_filter=[key])]
1375        if children:
1376            child = children[0]
1377        else:
1378            raise KeyError(key)
1379        self._len -= 1
1380        unmapped = child.unmap(store, key)
1381        self._key = None
1382        search_key = self._search_key(key)
1383        if len(unmapped) == 0:
1384            # All child nodes are gone, remove the child:
1385            del self._items[search_key]
1386            unmapped = None
1387        else:
1388            # Stash the returned node
1389            self._items[search_key] = unmapped
1390        if len(self._items) == 1:
1391            # this node is no longer needed:
1392            return list(self._items.values())[0]
1393        if isinstance(unmapped, InternalNode):
1394            return self
1395        if check_remap:
1396            return self._check_remap(store)
1397        else:
1398            return self
1399
1400    def _check_remap(self, store):
1401        """Check if all keys contained by children fit in a single LeafNode.
1402
1403        :param store: A store to use for reading more nodes
1404        :return: Either self, or a new LeafNode which should replace self.
1405        """
1406        # Logic for how we determine when we need to rebuild
1407        # 1) Implicitly unmap() is removing a key which means that the child
1408        #    nodes are going to be shrinking by some extent.
1409        # 2) If all children are LeafNodes, it is possible that they could be
1410        #    combined into a single LeafNode, which can then completely replace
1411        #    this internal node with a single LeafNode
1412        # 3) If *one* child is an InternalNode, we assume it has already done
1413        #    all the work to determine that its children cannot collapse, and
1414        #    we can then assume that those nodes *plus* the current nodes don't
1415        #    have a chance of collapsing either.
1416        #    So a very cheap check is to just say if 'unmapped' is an
1417        #    InternalNode, we don't have to check further.
1418
1419        # TODO: Another alternative is to check the total size of all known
1420        #       LeafNodes. If there is some formula we can use to determine the
1421        #       final size without actually having to read in any more
1422        #       children, it would be nice to have. However, we have to be
1423        #       careful with stuff like nodes that pull out the common prefix
1424        #       of each key, as adding a new key can change the common prefix
1425        #       and cause size changes greater than the length of one key.
1426        #       So for now, we just add everything to a new Leaf until it
1427        #       splits, as we know that will give the right answer
1428        new_leaf = LeafNode(search_key_func=self._search_key_func)
1429        new_leaf.set_maximum_size(self._maximum_size)
1430        new_leaf._key_width = self._key_width
1431        # A batch_size of 16 was chosen because:
1432        #   a) In testing, a 4k page held 14 times. So if we have more than 16
1433        #      leaf nodes we are unlikely to hold them in a single new leaf
1434        #      node. This still allows for 1 round trip
1435        #   b) With 16-way fan out, we can still do a single round trip
1436        #   c) With 255-way fan out, we don't want to read all 255 and destroy
1437        #      the page cache, just to determine that we really don't need it.
1438        for node, _ in self._iter_nodes(store, batch_size=16):
1439            if isinstance(node, InternalNode):
1440                # Without looking at any leaf nodes, we are sure
1441                return self
1442            for key, value in node._items.items():
1443                if new_leaf._map_no_split(key, value):
1444                    return self
1445        trace.mutter("remap generated a new LeafNode")
1446        return new_leaf
1447
1448
1449def _deserialise(data, key, search_key_func):
1450    """Helper for repositorydetails - convert bytes to a node."""
1451    if data.startswith(b"chkleaf:\n"):
1452        node = LeafNode.deserialise(data, key, search_key_func=search_key_func)
1453    elif data.startswith(b"chknode:\n"):
1454        node = InternalNode.deserialise(data, key,
1455                                        search_key_func=search_key_func)
1456    else:
1457        raise AssertionError("Unknown node type.")
1458    return node
1459
1460
1461class CHKMapDifference(object):
1462    """Iterate the stored pages and key,value pairs for (new - old).
1463
1464    This class provides a generator over the stored CHK pages and the
1465    (key, value) pairs that are in any of the new maps and not in any of the
1466    old maps.
1467
1468    Note that it may yield chk pages that are common (especially root nodes),
1469    but it won't yield (key,value) pairs that are common.
1470    """
1471
1472    def __init__(self, store, new_root_keys, old_root_keys,
1473                 search_key_func, pb=None):
1474        # TODO: Should we add a StaticTuple barrier here? It would be nice to
1475        #       force callers to use StaticTuple, because there will often be
1476        #       lots of keys passed in here. And even if we cast it locally,
1477        #       that just meanst that we will have *both* a StaticTuple and a
1478        #       tuple() in memory, referring to the same object. (so a net
1479        #       increase in memory, not a decrease.)
1480        self._store = store
1481        self._new_root_keys = new_root_keys
1482        self._old_root_keys = old_root_keys
1483        self._pb = pb
1484        # All uninteresting chks that we have seen. By the time they are added
1485        # here, they should be either fully ignored, or queued up for
1486        # processing
1487        # TODO: This might grow to a large size if there are lots of merge
1488        #       parents, etc. However, it probably doesn't scale to O(history)
1489        #       like _processed_new_refs does.
1490        self._all_old_chks = set(self._old_root_keys)
1491        # All items that we have seen from the old_root_keys
1492        self._all_old_items = set()
1493        # These are interesting items which were either read, or already in the
1494        # interesting queue (so we don't need to walk them again)
1495        # TODO: processed_new_refs becomes O(all_chks), consider switching to
1496        #       SimpleSet here.
1497        self._processed_new_refs = set()
1498        self._search_key_func = search_key_func
1499
1500        # The uninteresting and interesting nodes to be searched
1501        self._old_queue = []
1502        self._new_queue = []
1503        # Holds the (key, value) items found when processing the root nodes,
1504        # waiting for the uninteresting nodes to be walked
1505        self._new_item_queue = []
1506        self._state = None
1507
1508    def _read_nodes_from_store(self, keys):
1509        # We chose not to use _get_cache(), because we think in
1510        # terms of records to be yielded. Also, we expect to touch each page
1511        # only 1 time during this code. (We may want to evaluate saving the
1512        # raw bytes into the page cache, which would allow a working tree
1513        # update after the fetch to not have to read the bytes again.)
1514        as_st = StaticTuple.from_sequence
1515        stream = self._store.get_record_stream(keys, 'unordered', True)
1516        for record in stream:
1517            if self._pb is not None:
1518                self._pb.tick()
1519            if record.storage_kind == 'absent':
1520                raise errors.NoSuchRevision(self._store, record.key)
1521            bytes = record.get_bytes_as('fulltext')
1522            node = _deserialise(bytes, record.key,
1523                                search_key_func=self._search_key_func)
1524            if isinstance(node, InternalNode):
1525                # Note we don't have to do node.refs() because we know that
1526                # there are no children that have been pushed into this node
1527                # Note: Using as_st() here seemed to save 1.2MB, which would
1528                #       indicate that we keep 100k prefix_refs around while
1529                #       processing. They *should* be shorter lived than that...
1530                #       It does cost us ~10s of processing time
1531                prefix_refs = list(node._items.items())
1532                items = []
1533            else:
1534                prefix_refs = []
1535                # Note: We don't use a StaticTuple here. Profiling showed a
1536                #       minor memory improvement (0.8MB out of 335MB peak 0.2%)
1537                #       But a significant slowdown (15s / 145s, or 10%)
1538                items = list(node._items.items())
1539            yield record, node, prefix_refs, items
1540
1541    def _read_old_roots(self):
1542        old_chks_to_enqueue = []
1543        all_old_chks = self._all_old_chks
1544        for record, node, prefix_refs, items in \
1545                self._read_nodes_from_store(self._old_root_keys):
1546            # Uninteresting node
1547            prefix_refs = [p_r for p_r in prefix_refs
1548                           if p_r[1] not in all_old_chks]
1549            new_refs = [p_r[1] for p_r in prefix_refs]
1550            all_old_chks.update(new_refs)
1551            # TODO: This might be a good time to turn items into StaticTuple
1552            #       instances and possibly intern them. However, this does not
1553            #       impact 'initial branch' performance, so I'm not worrying
1554            #       about this yet
1555            self._all_old_items.update(items)
1556            # Queue up the uninteresting references
1557            # Don't actually put them in the 'to-read' queue until we have
1558            # finished checking the interesting references
1559            old_chks_to_enqueue.extend(prefix_refs)
1560        return old_chks_to_enqueue
1561
1562    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1563        # At this point, we have read all the uninteresting and interesting
1564        # items, so we can queue up the uninteresting stuff, knowing that we've
1565        # handled the interesting ones
1566        for prefix, ref in old_chks_to_enqueue:
1567            not_interesting = True
1568            for i in range(len(prefix), 0, -1):
1569                if prefix[:i] in new_prefixes:
1570                    not_interesting = False
1571                    break
1572            if not_interesting:
1573                # This prefix is not part of the remaining 'interesting set'
1574                continue
1575            self._old_queue.append(ref)
1576
1577    def _read_all_roots(self):
1578        """Read the root pages.
1579
1580        This is structured as a generator, so that the root records can be
1581        yielded up to whoever needs them without any buffering.
1582        """
1583        # This is the bootstrap phase
1584        if not self._old_root_keys:
1585            # With no old_root_keys we can just shortcut and be ready
1586            # for _flush_new_queue
1587            self._new_queue = list(self._new_root_keys)
1588            return
1589        old_chks_to_enqueue = self._read_old_roots()
1590        # filter out any root keys that are already known to be uninteresting
1591        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1592        # These are prefixes that are present in new_keys that we are
1593        # thinking to yield
1594        new_prefixes = set()
1595        # We are about to yield all of these, so we don't want them getting
1596        # added a second time
1597        processed_new_refs = self._processed_new_refs
1598        processed_new_refs.update(new_keys)
1599        for record, node, prefix_refs, items in \
1600                self._read_nodes_from_store(new_keys):
1601            # At this level, we now know all the uninteresting references
1602            # So we filter and queue up whatever is remaining
1603            prefix_refs = [p_r for p_r in prefix_refs
1604                           if p_r[1] not in self._all_old_chks and
1605                           p_r[1] not in processed_new_refs]
1606            refs = [p_r[1] for p_r in prefix_refs]
1607            new_prefixes.update([p_r[0] for p_r in prefix_refs])
1608            self._new_queue.extend(refs)
1609            # TODO: We can potentially get multiple items here, however the
1610            #       current design allows for this, as callers will do the work
1611            #       to make the results unique. We might profile whether we
1612            #       gain anything by ensuring unique return values for items
1613            # TODO: This might be a good time to cast to StaticTuple, as
1614            #       self._new_item_queue will hold the contents of multiple
1615            #       records for an extended lifetime
1616            new_items = [item for item in items
1617                         if item not in self._all_old_items]
1618            self._new_item_queue.extend(new_items)
1619            new_prefixes.update([self._search_key_func(item[0])
1620                                 for item in new_items])
1621            processed_new_refs.update(refs)
1622            yield record
1623        # For new_prefixes we have the full length prefixes queued up.
1624        # However, we also need possible prefixes. (If we have a known ref to
1625        # 'ab', then we also need to include 'a'.) So expand the
1626        # new_prefixes to include all shorter prefixes
1627        for prefix in list(new_prefixes):
1628            new_prefixes.update([prefix[:i] for i in range(1, len(prefix))])
1629        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1630
1631    def _flush_new_queue(self):
1632        # No need to maintain the heap invariant anymore, just pull things out
1633        # and process them
1634        refs = set(self._new_queue)
1635        self._new_queue = []
1636        # First pass, flush all interesting items and convert to using direct refs
1637        all_old_chks = self._all_old_chks
1638        processed_new_refs = self._processed_new_refs
1639        all_old_items = self._all_old_items
1640        new_items = [item for item in self._new_item_queue
1641                     if item not in all_old_items]
1642        self._new_item_queue = []
1643        if new_items:
1644            yield None, new_items
1645        refs = refs.difference(all_old_chks)
1646        processed_new_refs.update(refs)
1647        while refs:
1648            # TODO: Using a SimpleSet for self._processed_new_refs and
1649            #       saved as much as 10MB of peak memory. However, it requires
1650            #       implementing a non-pyrex version.
1651            next_refs = set()
1652            next_refs_update = next_refs.update
1653            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1654            # from 1m54s to 1m51s. Consider it.
1655            for record, _, p_refs, items in self._read_nodes_from_store(refs):
1656                if all_old_items:
1657                    # using the 'if' check saves about 145s => 141s, when
1658                    # streaming initial branch of Launchpad data.
1659                    items = [item for item in items
1660                             if item not in all_old_items]
1661                yield record, items
1662                next_refs_update([p_r[1] for p_r in p_refs])
1663                del p_refs
1664            # set1.difference(set/dict) walks all of set1, and checks if it
1665            # exists in 'other'.
1666            # set1.difference(iterable) walks all of iterable, and does a
1667            # 'difference_update' on a clone of set1. Pick wisely based on the
1668            # expected sizes of objects.
1669            # in our case it is expected that 'new_refs' will always be quite
1670            # small.
1671            next_refs = next_refs.difference(all_old_chks)
1672            next_refs = next_refs.difference(processed_new_refs)
1673            processed_new_refs.update(next_refs)
1674            refs = next_refs
1675
1676    def _process_next_old(self):
1677        # Since we don't filter uninteresting any further than during
1678        # _read_all_roots, process the whole queue in a single pass.
1679        refs = self._old_queue
1680        self._old_queue = []
1681        all_old_chks = self._all_old_chks
1682        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1683            # TODO: Use StaticTuple here?
1684            self._all_old_items.update(items)
1685            refs = [r for _, r in prefix_refs if r not in all_old_chks]
1686            self._old_queue.extend(refs)
1687            all_old_chks.update(refs)
1688
1689    def _process_queues(self):
1690        while self._old_queue:
1691            self._process_next_old()
1692        return self._flush_new_queue()
1693
1694    def process(self):
1695        for record in self._read_all_roots():
1696            yield record, []
1697        for record, items in self._process_queues():
1698            yield record, items
1699
1700
1701def iter_interesting_nodes(store, interesting_root_keys,
1702                           uninteresting_root_keys, pb=None):
1703    """Given root keys, find interesting nodes.
1704
1705    Evaluate nodes referenced by interesting_root_keys. Ones that are also
1706    referenced from uninteresting_root_keys are not considered interesting.
1707
1708    :param interesting_root_keys: keys which should be part of the
1709        "interesting" nodes (which will be yielded)
1710    :param uninteresting_root_keys: keys which should be filtered out of the
1711        result set.
1712    :return: Yield
1713        (interesting record, {interesting key:values})
1714    """
1715    iterator = CHKMapDifference(store, interesting_root_keys,
1716                                uninteresting_root_keys,
1717                                search_key_func=store._search_key_func,
1718                                pb=pb)
1719    return iterator.process()
1720
1721
1722try:
1723    from ._chk_map_pyx import (
1724        _bytes_to_text_key,
1725        _search_key_16,
1726        _search_key_255,
1727        _deserialise_leaf_node,
1728        _deserialise_internal_node,
1729        )
1730except ImportError as e:
1731    osutils.failed_to_load_extension(e)
1732    from ._chk_map_py import (
1733        _bytes_to_text_key,
1734        _search_key_16,
1735        _search_key_255,
1736        _deserialise_leaf_node,
1737        _deserialise_internal_node,
1738        )  # noqa: F401
1739search_key_registry.register(b'hash-16-way', _search_key_16)
1740search_key_registry.register(b'hash-255-way', _search_key_255)
1741
1742
1743def _check_key(key):
1744    """Helper function to assert that a key is properly formatted.
1745
1746    This generally shouldn't be used in production code, but it can be helpful
1747    to debug problems.
1748    """
1749    if not isinstance(key, StaticTuple):
1750        raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1751    if len(key) != 1:
1752        raise ValueError('key %r should have length 1, not %d' %
1753                         (key, len(key),))
1754    if not isinstance(key[0], str):
1755        raise TypeError('key %r should hold a str, not %r'
1756                        % (key, type(key[0])))
1757    if not key[0].startswith('sha1:'):
1758        raise ValueError('key %r should point to a sha1:' % (key,))
1759