1# Copyright (C) 2008-2011 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Persistent maps from tuple_of_strings->string using CHK stores. 18 19Overview and current status: 20 21The CHKMap class implements a dict from tuple_of_strings->string by using a trie 22with internal nodes of 8-bit fan out; The key tuples are mapped to strings by 23joining them by \x00, and \x00 padding shorter keys out to the length of the 24longest key. Leaf nodes are packed as densely as possible, and internal nodes 25are all an additional 8-bits wide leading to a sparse upper tree. 26 27Updates to a CHKMap are done preferentially via the apply_delta method, to 28allow optimisation of the update operation; but individual map/unmap calls are 29possible and supported. Individual changes via map/unmap are buffered in memory 30until the _save method is called to force serialisation of the tree. 31apply_delta records its changes immediately by performing an implicit _save. 32 33TODO: 34----- 35 36Densely packed upper nodes. 37 38""" 39 40import heapq 41import threading 42 43from .. import ( 44 errors, 45 lru_cache, 46 osutils, 47 registry, 48 static_tuple, 49 trace, 50 ) 51from ..static_tuple import StaticTuple 52 53# approx 4MB 54# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan 55# out, it takes 3.1MB to cache the layer. 56_PAGE_CACHE_SIZE = 4 * 1024 * 1024 57# Per thread caches for 2 reasons: 58# - in the server we may be serving very different content, so we get less 59# cache thrashing. 60# - we avoid locking on every cache lookup. 61_thread_caches = threading.local() 62# The page cache. 63_thread_caches.page_cache = None 64 65 66def _get_cache(): 67 """Get the per-thread page cache. 68 69 We need a function to do this because in a new thread the _thread_caches 70 threading.local object does not have the cache initialized yet. 71 """ 72 page_cache = getattr(_thread_caches, 'page_cache', None) 73 if page_cache is None: 74 # We are caching bytes so len(value) is perfectly accurate 75 page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE) 76 _thread_caches.page_cache = page_cache 77 return page_cache 78 79 80def clear_cache(): 81 _get_cache().clear() 82 83 84# If a ChildNode falls below this many bytes, we check for a remap 85_INTERESTING_NEW_SIZE = 50 86# If a ChildNode shrinks by more than this amount, we check for a remap 87_INTERESTING_SHRINKAGE_LIMIT = 20 88 89 90def _search_key_plain(key): 91 """Map the key tuple into a search string that just uses the key bytes.""" 92 return b'\x00'.join(key) 93 94 95search_key_registry = registry.Registry() 96search_key_registry.register(b'plain', _search_key_plain) 97 98 99class CHKMap(object): 100 """A persistent map from string to string backed by a CHK store.""" 101 102 __slots__ = ('_store', '_root_node', '_search_key_func') 103 104 def __init__(self, store, root_key, search_key_func=None): 105 """Create a CHKMap object. 106 107 :param store: The store the CHKMap is stored in. 108 :param root_key: The root key of the map. None to create an empty 109 CHKMap. 110 :param search_key_func: A function mapping a key => bytes. These bytes 111 are then used by the internal nodes to split up leaf nodes into 112 multiple pages. 113 """ 114 self._store = store 115 if search_key_func is None: 116 search_key_func = _search_key_plain 117 self._search_key_func = search_key_func 118 if root_key is None: 119 self._root_node = LeafNode(search_key_func=search_key_func) 120 else: 121 self._root_node = self._node_key(root_key) 122 123 def apply_delta(self, delta): 124 """Apply a delta to the map. 125 126 :param delta: An iterable of old_key, new_key, new_value tuples. 127 If new_key is not None, then new_key->new_value is inserted 128 into the map; if old_key is not None, then the old mapping 129 of old_key is removed. 130 """ 131 has_deletes = False 132 # Check preconditions first. 133 as_st = StaticTuple.from_sequence 134 new_items = {as_st(key) for (old, key, value) in delta 135 if key is not None and old is None} 136 existing_new = list(self.iteritems(key_filter=new_items)) 137 if existing_new: 138 raise errors.InconsistentDeltaDelta(delta, 139 "New items are already in the map %r." % existing_new) 140 # Now apply changes. 141 for old, new, value in delta: 142 if old is not None and old != new: 143 self.unmap(old, check_remap=False) 144 has_deletes = True 145 for old, new, value in delta: 146 if new is not None: 147 self.map(new, value) 148 if has_deletes: 149 self._check_remap() 150 return self._save() 151 152 def _ensure_root(self): 153 """Ensure that the root node is an object not a key.""" 154 if isinstance(self._root_node, StaticTuple): 155 # Demand-load the root 156 self._root_node = self._get_node(self._root_node) 157 158 def _get_node(self, node): 159 """Get a node. 160 161 Note that this does not update the _items dict in objects containing a 162 reference to this node. As such it does not prevent subsequent IO being 163 performed. 164 165 :param node: A tuple key or node object. 166 :return: A node object. 167 """ 168 if isinstance(node, StaticTuple): 169 bytes = self._read_bytes(node) 170 return _deserialise(bytes, node, 171 search_key_func=self._search_key_func) 172 else: 173 return node 174 175 def _read_bytes(self, key): 176 try: 177 return _get_cache()[key] 178 except KeyError: 179 stream = self._store.get_record_stream([key], 'unordered', True) 180 bytes = next(stream).get_bytes_as('fulltext') 181 _get_cache()[key] = bytes 182 return bytes 183 184 def _dump_tree(self, include_keys=False, encoding='utf-8'): 185 """Return the tree in a string representation.""" 186 self._ensure_root() 187 def decode(x): return x.decode(encoding) 188 res = self._dump_tree_node(self._root_node, prefix=b'', indent='', 189 decode=decode, include_keys=include_keys) 190 res.append('') # Give a trailing '\n' 191 return '\n'.join(res) 192 193 def _dump_tree_node(self, node, prefix, indent, decode, include_keys=True): 194 """For this node and all children, generate a string representation.""" 195 result = [] 196 if not include_keys: 197 key_str = '' 198 else: 199 node_key = node.key() 200 if node_key is not None: 201 key_str = ' %s' % (decode(node_key[0]),) 202 else: 203 key_str = ' None' 204 result.append('%s%r %s%s' % (indent, decode(prefix), node.__class__.__name__, 205 key_str)) 206 if isinstance(node, InternalNode): 207 # Trigger all child nodes to get loaded 208 list(node._iter_nodes(self._store)) 209 for prefix, sub in sorted(node._items.items()): 210 result.extend(self._dump_tree_node(sub, prefix, indent + ' ', 211 decode=decode, include_keys=include_keys)) 212 else: 213 for key, value in sorted(node._items.items()): 214 # Don't use prefix nor indent here to line up when used in 215 # tests in conjunction with assertEqualDiff 216 result.append(' %r %r' % ( 217 tuple([decode(ke) for ke in key]), decode(value))) 218 return result 219 220 @classmethod 221 def from_dict(klass, store, initial_value, maximum_size=0, key_width=1, 222 search_key_func=None): 223 """Create a CHKMap in store with initial_value as the content. 224 225 :param store: The store to record initial_value in, a VersionedFiles 226 object with 1-tuple keys supporting CHK key generation. 227 :param initial_value: A dict to store in store. Its keys and values 228 must be bytestrings. 229 :param maximum_size: The maximum_size rule to apply to nodes. This 230 determines the size at which no new data is added to a single node. 231 :param key_width: The number of elements in each key_tuple being stored 232 in this map. 233 :param search_key_func: A function mapping a key => bytes. These bytes 234 are then used by the internal nodes to split up leaf nodes into 235 multiple pages. 236 :return: The root chk of the resulting CHKMap. 237 """ 238 root_key = klass._create_directly(store, initial_value, 239 maximum_size=maximum_size, key_width=key_width, 240 search_key_func=search_key_func) 241 if not isinstance(root_key, StaticTuple): 242 raise AssertionError('we got a %s instead of a StaticTuple' 243 % (type(root_key),)) 244 return root_key 245 246 @classmethod 247 def _create_via_map(klass, store, initial_value, maximum_size=0, 248 key_width=1, search_key_func=None): 249 result = klass(store, None, search_key_func=search_key_func) 250 result._root_node.set_maximum_size(maximum_size) 251 result._root_node._key_width = key_width 252 delta = [] 253 for key, value in initial_value.items(): 254 delta.append((None, key, value)) 255 root_key = result.apply_delta(delta) 256 return root_key 257 258 @classmethod 259 def _create_directly(klass, store, initial_value, maximum_size=0, 260 key_width=1, search_key_func=None): 261 node = LeafNode(search_key_func=search_key_func) 262 node.set_maximum_size(maximum_size) 263 node._key_width = key_width 264 as_st = StaticTuple.from_sequence 265 node._items = dict((as_st(key), val) 266 for key, val in initial_value.items()) 267 node._raw_size = sum(node._key_value_len(key, value) 268 for key, value in node._items.items()) 269 node._len = len(node._items) 270 node._compute_search_prefix() 271 node._compute_serialised_prefix() 272 if (node._len > 1 and 273 maximum_size and 274 node._current_size() > maximum_size): 275 prefix, node_details = node._split(store) 276 if len(node_details) == 1: 277 raise AssertionError('Failed to split using node._split') 278 node = InternalNode(prefix, search_key_func=search_key_func) 279 node.set_maximum_size(maximum_size) 280 node._key_width = key_width 281 for split, subnode in node_details: 282 node.add_node(split, subnode) 283 keys = list(node.serialise(store)) 284 return keys[-1] 285 286 def iter_changes(self, basis): 287 """Iterate over the changes between basis and self. 288 289 :return: An iterator of tuples: (key, old_value, new_value). Old_value 290 is None for keys only in self; new_value is None for keys only in 291 basis. 292 """ 293 # Overview: 294 # Read both trees in lexographic, highest-first order. 295 # Any identical nodes we skip 296 # Any unique prefixes we output immediately. 297 # values in a leaf node are treated as single-value nodes in the tree 298 # which allows them to be not-special-cased. We know to output them 299 # because their value is a string, not a key(tuple) or node. 300 # 301 # corner cases to beware of when considering this function: 302 # *) common references are at different heights. 303 # consider two trees: 304 # {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}} 305 # {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'}, 306 # 'ab':LeafNode={'ab':'bar'}} 307 # 'b': LeafNode={'b'}} 308 # the node with aaa/aab will only be encountered in the second tree 309 # after reading the 'a' subtree, but it is encountered in the first 310 # tree immediately. Variations on this may have read internal nodes 311 # like this. we want to cut the entire pending subtree when we 312 # realise we have a common node. For this we use a list of keys - 313 # the path to a node - and check the entire path is clean as we 314 # process each item. 315 if self._node_key(self._root_node) == self._node_key(basis._root_node): 316 return 317 self._ensure_root() 318 basis._ensure_root() 319 excluded_keys = set() 320 self_node = self._root_node 321 basis_node = basis._root_node 322 # A heap, each element is prefix, node(tuple/NodeObject/string), 323 # key_path (a list of tuples, tail-sharing down the tree.) 324 self_pending = [] 325 basis_pending = [] 326 327 def process_node(node, path, a_map, pending): 328 # take a node and expand it 329 node = a_map._get_node(node) 330 if isinstance(node, LeafNode): 331 path = (node._key, path) 332 for key, value in node._items.items(): 333 # For a LeafNode, the key is a serialized_key, rather than 334 # a search_key, but the heap is using search_keys 335 search_key = node._search_key_func(key) 336 heapq.heappush(pending, (search_key, key, value, path)) 337 else: 338 # type(node) == InternalNode 339 path = (node._key, path) 340 for prefix, child in node._items.items(): 341 heapq.heappush(pending, (prefix, None, child, path)) 342 343 def process_common_internal_nodes(self_node, basis_node): 344 self_items = set(self_node._items.items()) 345 basis_items = set(basis_node._items.items()) 346 path = (self_node._key, None) 347 for prefix, child in self_items - basis_items: 348 heapq.heappush(self_pending, (prefix, None, child, path)) 349 path = (basis_node._key, None) 350 for prefix, child in basis_items - self_items: 351 heapq.heappush(basis_pending, (prefix, None, child, path)) 352 353 def process_common_leaf_nodes(self_node, basis_node): 354 self_items = set(self_node._items.items()) 355 basis_items = set(basis_node._items.items()) 356 path = (self_node._key, None) 357 for key, value in self_items - basis_items: 358 prefix = self._search_key_func(key) 359 heapq.heappush(self_pending, (prefix, key, value, path)) 360 path = (basis_node._key, None) 361 for key, value in basis_items - self_items: 362 prefix = basis._search_key_func(key) 363 heapq.heappush(basis_pending, (prefix, key, value, path)) 364 365 def process_common_prefix_nodes(self_node, self_path, 366 basis_node, basis_path): 367 # Would it be more efficient if we could request both at the same 368 # time? 369 self_node = self._get_node(self_node) 370 basis_node = basis._get_node(basis_node) 371 if (isinstance(self_node, InternalNode) and 372 isinstance(basis_node, InternalNode)): 373 # Matching internal nodes 374 process_common_internal_nodes(self_node, basis_node) 375 elif (isinstance(self_node, LeafNode) and 376 isinstance(basis_node, LeafNode)): 377 process_common_leaf_nodes(self_node, basis_node) 378 else: 379 process_node(self_node, self_path, self, self_pending) 380 process_node(basis_node, basis_path, basis, basis_pending) 381 process_common_prefix_nodes(self_node, None, basis_node, None) 382 self_seen = set() 383 basis_seen = set() 384 excluded_keys = set() 385 386 def check_excluded(key_path): 387 # Note that this is N^2, it depends on us trimming trees 388 # aggressively to not become slow. 389 # A better implementation would probably have a reverse map 390 # back to the children of a node, and jump straight to it when 391 # a common node is detected, the proceed to remove the already 392 # pending children. breezy.graph has a searcher module with a 393 # similar problem. 394 while key_path is not None: 395 key, key_path = key_path 396 if key in excluded_keys: 397 return True 398 return False 399 400 loop_counter = 0 401 while self_pending or basis_pending: 402 loop_counter += 1 403 if not self_pending: 404 # self is exhausted: output remainder of basis 405 for prefix, key, node, path in basis_pending: 406 if check_excluded(path): 407 continue 408 node = basis._get_node(node) 409 if key is not None: 410 # a value 411 yield (key, node, None) 412 else: 413 # subtree - fastpath the entire thing. 414 for key, value in node.iteritems(basis._store): 415 yield (key, value, None) 416 return 417 elif not basis_pending: 418 # basis is exhausted: output remainder of self. 419 for prefix, key, node, path in self_pending: 420 if check_excluded(path): 421 continue 422 node = self._get_node(node) 423 if key is not None: 424 # a value 425 yield (key, None, node) 426 else: 427 # subtree - fastpath the entire thing. 428 for key, value in node.iteritems(self._store): 429 yield (key, None, value) 430 return 431 else: 432 # XXX: future optimisation - yield the smaller items 433 # immediately rather than pushing everything on/off the 434 # heaps. Applies to both internal nodes and leafnodes. 435 if self_pending[0][0] < basis_pending[0][0]: 436 # expand self 437 prefix, key, node, path = heapq.heappop(self_pending) 438 if check_excluded(path): 439 continue 440 if key is not None: 441 # a value 442 yield (key, None, node) 443 else: 444 process_node(node, path, self, self_pending) 445 continue 446 elif self_pending[0][0] > basis_pending[0][0]: 447 # expand basis 448 prefix, key, node, path = heapq.heappop(basis_pending) 449 if check_excluded(path): 450 continue 451 if key is not None: 452 # a value 453 yield (key, node, None) 454 else: 455 process_node(node, path, basis, basis_pending) 456 continue 457 else: 458 # common prefix: possibly expand both 459 if self_pending[0][1] is None: 460 # process next self 461 read_self = True 462 else: 463 read_self = False 464 if basis_pending[0][1] is None: 465 # process next basis 466 read_basis = True 467 else: 468 read_basis = False 469 if not read_self and not read_basis: 470 # compare a common value 471 self_details = heapq.heappop(self_pending) 472 basis_details = heapq.heappop(basis_pending) 473 if self_details[2] != basis_details[2]: 474 yield (self_details[1], 475 basis_details[2], self_details[2]) 476 continue 477 # At least one side wasn't a simple value 478 if (self._node_key(self_pending[0][2]) 479 == self._node_key(basis_pending[0][2])): 480 # Identical pointers, skip (and don't bother adding to 481 # excluded, it won't turn up again. 482 heapq.heappop(self_pending) 483 heapq.heappop(basis_pending) 484 continue 485 # Now we need to expand this node before we can continue 486 if read_self and read_basis: 487 # Both sides start with the same prefix, so process 488 # them in parallel 489 self_prefix, _, self_node, self_path = heapq.heappop( 490 self_pending) 491 basis_prefix, _, basis_node, basis_path = heapq.heappop( 492 basis_pending) 493 if self_prefix != basis_prefix: 494 raise AssertionError( 495 '%r != %r' % (self_prefix, basis_prefix)) 496 process_common_prefix_nodes( 497 self_node, self_path, 498 basis_node, basis_path) 499 continue 500 if read_self: 501 prefix, key, node, path = heapq.heappop(self_pending) 502 if check_excluded(path): 503 continue 504 process_node(node, path, self, self_pending) 505 if read_basis: 506 prefix, key, node, path = heapq.heappop(basis_pending) 507 if check_excluded(path): 508 continue 509 process_node(node, path, basis, basis_pending) 510 # print loop_counter 511 512 def iteritems(self, key_filter=None): 513 """Iterate over the entire CHKMap's contents.""" 514 self._ensure_root() 515 if key_filter is not None: 516 as_st = StaticTuple.from_sequence 517 key_filter = [as_st(key) for key in key_filter] 518 return self._root_node.iteritems(self._store, key_filter=key_filter) 519 520 def key(self): 521 """Return the key for this map.""" 522 if isinstance(self._root_node, StaticTuple): 523 return self._root_node 524 else: 525 return self._root_node._key 526 527 def __len__(self): 528 self._ensure_root() 529 return len(self._root_node) 530 531 def map(self, key, value): 532 """Map a key tuple to value. 533 534 :param key: A key to map. 535 :param value: The value to assign to key. 536 """ 537 key = StaticTuple.from_sequence(key) 538 # Need a root object. 539 self._ensure_root() 540 prefix, node_details = self._root_node.map(self._store, key, value) 541 if len(node_details) == 1: 542 self._root_node = node_details[0][1] 543 else: 544 self._root_node = InternalNode(prefix, 545 search_key_func=self._search_key_func) 546 self._root_node.set_maximum_size(node_details[0][1].maximum_size) 547 self._root_node._key_width = node_details[0][1]._key_width 548 for split, node in node_details: 549 self._root_node.add_node(split, node) 550 551 def _node_key(self, node): 552 """Get the key for a node whether it's a tuple or node.""" 553 if isinstance(node, tuple): 554 node = StaticTuple.from_sequence(node) 555 if isinstance(node, StaticTuple): 556 return node 557 else: 558 return node._key 559 560 def unmap(self, key, check_remap=True): 561 """remove key from the map.""" 562 key = StaticTuple.from_sequence(key) 563 self._ensure_root() 564 if isinstance(self._root_node, InternalNode): 565 unmapped = self._root_node.unmap(self._store, key, 566 check_remap=check_remap) 567 else: 568 unmapped = self._root_node.unmap(self._store, key) 569 self._root_node = unmapped 570 571 def _check_remap(self): 572 """Check if nodes can be collapsed.""" 573 self._ensure_root() 574 if isinstance(self._root_node, InternalNode): 575 self._root_node = self._root_node._check_remap(self._store) 576 577 def _save(self): 578 """Save the map completely. 579 580 :return: The key of the root node. 581 """ 582 if isinstance(self._root_node, StaticTuple): 583 # Already saved. 584 return self._root_node 585 keys = list(self._root_node.serialise(self._store)) 586 return keys[-1] 587 588 589class Node(object): 590 """Base class defining the protocol for CHK Map nodes. 591 592 :ivar _raw_size: The total size of the serialized key:value data, before 593 adding the header bytes, and without prefix compression. 594 """ 595 596 __slots__ = ('_key', '_len', '_maximum_size', '_key_width', 597 '_raw_size', '_items', '_search_prefix', '_search_key_func' 598 ) 599 600 def __init__(self, key_width=1): 601 """Create a node. 602 603 :param key_width: The width of keys for this node. 604 """ 605 self._key = None 606 # Current number of elements 607 self._len = 0 608 self._maximum_size = 0 609 self._key_width = key_width 610 # current size in bytes 611 self._raw_size = 0 612 # The pointers/values this node has - meaning defined by child classes. 613 self._items = {} 614 # The common search prefix 615 self._search_prefix = None 616 617 def __repr__(self): 618 items_str = str(sorted(self._items)) 619 if len(items_str) > 20: 620 items_str = items_str[:16] + '...]' 621 return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % ( 622 self.__class__.__name__, self._key, self._len, self._raw_size, 623 self._maximum_size, self._search_prefix, items_str) 624 625 def key(self): 626 return self._key 627 628 def __len__(self): 629 return self._len 630 631 @property 632 def maximum_size(self): 633 """What is the upper limit for adding references to a node.""" 634 return self._maximum_size 635 636 def set_maximum_size(self, new_size): 637 """Set the size threshold for nodes. 638 639 :param new_size: The size at which no data is added to a node. 0 for 640 unlimited. 641 """ 642 self._maximum_size = new_size 643 644 @classmethod 645 def common_prefix(cls, prefix, key): 646 """Given 2 strings, return the longest prefix common to both. 647 648 :param prefix: This has been the common prefix for other keys, so it is 649 more likely to be the common prefix in this case as well. 650 :param key: Another string to compare to 651 """ 652 if key.startswith(prefix): 653 return prefix 654 pos = -1 655 # Is there a better way to do this? 656 for pos, (left, right) in enumerate(zip(prefix, key)): 657 if left != right: 658 pos -= 1 659 break 660 common = prefix[:pos + 1] 661 return common 662 663 @classmethod 664 def common_prefix_for_keys(cls, keys): 665 """Given a list of keys, find their common prefix. 666 667 :param keys: An iterable of strings. 668 :return: The longest common prefix of all keys. 669 """ 670 common_prefix = None 671 for key in keys: 672 if common_prefix is None: 673 common_prefix = key 674 continue 675 common_prefix = cls.common_prefix(common_prefix, key) 676 if not common_prefix: 677 # if common_prefix is the empty string, then we know it won't 678 # change further 679 return b'' 680 return common_prefix 681 682 683# Singleton indicating we have not computed _search_prefix yet 684_unknown = object() 685 686 687class LeafNode(Node): 688 """A node containing actual key:value pairs. 689 690 :ivar _items: A dict of key->value items. The key is in tuple form. 691 :ivar _size: The number of bytes that would be used by serializing all of 692 the key/value pairs. 693 """ 694 695 __slots__ = ('_common_serialised_prefix',) 696 697 def __init__(self, search_key_func=None): 698 Node.__init__(self) 699 # All of the keys in this leaf node share this common prefix 700 self._common_serialised_prefix = None 701 if search_key_func is None: 702 self._search_key_func = _search_key_plain 703 else: 704 self._search_key_func = search_key_func 705 706 def __repr__(self): 707 items_str = str(sorted(self._items)) 708 if len(items_str) > 20: 709 items_str = items_str[:16] + '...]' 710 return \ 711 '%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \ 712 % (self.__class__.__name__, self._key, self._len, self._raw_size, 713 self._maximum_size, self._search_prefix, self._key_width, items_str) 714 715 def _current_size(self): 716 """Answer the current serialised size of this node. 717 718 This differs from self._raw_size in that it includes the bytes used for 719 the header. 720 """ 721 if self._common_serialised_prefix is None: 722 bytes_for_items = 0 723 prefix_len = 0 724 else: 725 # We will store a single string with the common prefix 726 # And then that common prefix will not be stored in any of the 727 # entry lines 728 prefix_len = len(self._common_serialised_prefix) 729 bytes_for_items = (self._raw_size - (prefix_len * self._len)) 730 return (9 + # 'chkleaf:\n' + 731 len(str(self._maximum_size)) + 1 + 732 len(str(self._key_width)) + 1 + 733 len(str(self._len)) + 1 + 734 prefix_len + 1 + 735 bytes_for_items) 736 737 @classmethod 738 def deserialise(klass, bytes, key, search_key_func=None): 739 """Deserialise bytes, with key key, into a LeafNode. 740 741 :param bytes: The bytes of the node. 742 :param key: The key that the serialised node has. 743 """ 744 key = static_tuple.expect_static_tuple(key) 745 return _deserialise_leaf_node(bytes, key, 746 search_key_func=search_key_func) 747 748 def iteritems(self, store, key_filter=None): 749 """Iterate over items in the node. 750 751 :param key_filter: A filter to apply to the node. It should be a 752 list/set/dict or similar repeatedly iterable container. 753 """ 754 if key_filter is not None: 755 # Adjust the filter - short elements go to a prefix filter. All 756 # other items are looked up directly. 757 # XXX: perhaps defaultdict? Profiling<rinse and repeat> 758 filters = {} 759 for key in key_filter: 760 if len(key) == self._key_width: 761 # This filter is meant to match exactly one key, yield it 762 # if we have it. 763 try: 764 yield key, self._items[key] 765 except KeyError: 766 # This key is not present in this map, continue 767 pass 768 else: 769 # Short items, we need to match based on a prefix 770 filters.setdefault(len(key), set()).add(key) 771 if filters: 772 filters_itemview = filters.items() 773 for item in self._items.items(): 774 for length, length_filter in filters_itemview: 775 if item[0][:length] in length_filter: 776 yield item 777 break 778 else: 779 yield from self._items.items() 780 781 def _key_value_len(self, key, value): 782 # TODO: Should probably be done without actually joining the key, but 783 # then that can be done via the C extension 784 return (len(self._serialise_key(key)) + 1 + 785 len(b'%d' % value.count(b'\n')) + 1 + 786 len(value) + 1) 787 788 def _search_key(self, key): 789 return self._search_key_func(key) 790 791 def _map_no_split(self, key, value): 792 """Map a key to a value. 793 794 This assumes either the key does not already exist, or you have already 795 removed its size and length from self. 796 797 :return: True if adding this node should cause us to split. 798 """ 799 self._items[key] = value 800 self._raw_size += self._key_value_len(key, value) 801 self._len += 1 802 serialised_key = self._serialise_key(key) 803 if self._common_serialised_prefix is None: 804 self._common_serialised_prefix = serialised_key 805 else: 806 self._common_serialised_prefix = self.common_prefix( 807 self._common_serialised_prefix, serialised_key) 808 search_key = self._search_key(key) 809 if self._search_prefix is _unknown: 810 self._compute_search_prefix() 811 if self._search_prefix is None: 812 self._search_prefix = search_key 813 else: 814 self._search_prefix = self.common_prefix( 815 self._search_prefix, search_key) 816 if (self._len > 1 and 817 self._maximum_size and 818 self._current_size() > self._maximum_size): 819 # Check to see if all of the search_keys for this node are 820 # identical. We allow the node to grow under that circumstance 821 # (we could track this as common state, but it is infrequent) 822 if (search_key != self._search_prefix or 823 not self._are_search_keys_identical()): 824 return True 825 return False 826 827 def _split(self, store): 828 """We have overflowed. 829 830 Split this node into multiple LeafNodes, return it up the stack so that 831 the next layer creates a new InternalNode and references the new nodes. 832 833 :return: (common_serialised_prefix, [(node_serialised_prefix, node)]) 834 """ 835 if self._search_prefix is _unknown: 836 raise AssertionError('Search prefix must be known') 837 common_prefix = self._search_prefix 838 split_at = len(common_prefix) + 1 839 result = {} 840 for key, value in self._items.items(): 841 search_key = self._search_key(key) 842 prefix = search_key[:split_at] 843 # TODO: Generally only 1 key can be exactly the right length, 844 # which means we can only have 1 key in the node pointed 845 # at by the 'prefix\0' key. We might want to consider 846 # folding it into the containing InternalNode rather than 847 # having a fixed length-1 node. 848 # Note this is probably not true for hash keys, as they 849 # may get a '\00' node anywhere, but won't have keys of 850 # different lengths. 851 if len(prefix) < split_at: 852 prefix += b'\x00' * (split_at - len(prefix)) 853 if prefix not in result: 854 node = LeafNode(search_key_func=self._search_key_func) 855 node.set_maximum_size(self._maximum_size) 856 node._key_width = self._key_width 857 result[prefix] = node 858 else: 859 node = result[prefix] 860 sub_prefix, node_details = node.map(store, key, value) 861 if len(node_details) > 1: 862 if prefix != sub_prefix: 863 # This node has been split and is now found via a different 864 # path 865 result.pop(prefix) 866 new_node = InternalNode(sub_prefix, 867 search_key_func=self._search_key_func) 868 new_node.set_maximum_size(self._maximum_size) 869 new_node._key_width = self._key_width 870 for split, node in node_details: 871 new_node.add_node(split, node) 872 result[prefix] = new_node 873 return common_prefix, list(result.items()) 874 875 def map(self, store, key, value): 876 """Map key to value.""" 877 if key in self._items: 878 self._raw_size -= self._key_value_len(key, self._items[key]) 879 self._len -= 1 880 self._key = None 881 if self._map_no_split(key, value): 882 return self._split(store) 883 else: 884 if self._search_prefix is _unknown: 885 raise AssertionError('%r must be known' % self._search_prefix) 886 return self._search_prefix, [(b"", self)] 887 888 _serialise_key = b'\x00'.join 889 890 def serialise(self, store): 891 """Serialise the LeafNode to store. 892 893 :param store: A VersionedFiles honouring the CHK extensions. 894 :return: An iterable of the keys inserted by this operation. 895 """ 896 lines = [b"chkleaf:\n"] 897 lines.append(b"%d\n" % self._maximum_size) 898 lines.append(b"%d\n" % self._key_width) 899 lines.append(b"%d\n" % self._len) 900 if self._common_serialised_prefix is None: 901 lines.append(b'\n') 902 if len(self._items) != 0: 903 raise AssertionError('If _common_serialised_prefix is None' 904 ' we should have no items') 905 else: 906 lines.append(b'%s\n' % (self._common_serialised_prefix,)) 907 prefix_len = len(self._common_serialised_prefix) 908 for key, value in sorted(self._items.items()): 909 # Always add a final newline 910 value_lines = osutils.chunks_to_lines([value + b'\n']) 911 serialized = b"%s\x00%d\n" % (self._serialise_key(key), 912 len(value_lines)) 913 if not serialized.startswith(self._common_serialised_prefix): 914 raise AssertionError('We thought the common prefix was %r' 915 ' but entry %r does not have it in common' 916 % (self._common_serialised_prefix, serialized)) 917 lines.append(serialized[prefix_len:]) 918 lines.extend(value_lines) 919 sha1, _, _ = store.add_lines((None,), (), lines) 920 self._key = StaticTuple(b"sha1:" + sha1,).intern() 921 data = b''.join(lines) 922 if len(data) != self._current_size(): 923 raise AssertionError('Invalid _current_size') 924 _get_cache()[self._key] = data 925 return [self._key] 926 927 def refs(self): 928 """Return the references to other CHK's held by this node.""" 929 return [] 930 931 def _compute_search_prefix(self): 932 """Determine the common search prefix for all keys in this node. 933 934 :return: A bytestring of the longest search key prefix that is 935 unique within this node. 936 """ 937 search_keys = [self._search_key_func(key) for key in self._items] 938 self._search_prefix = self.common_prefix_for_keys(search_keys) 939 return self._search_prefix 940 941 def _are_search_keys_identical(self): 942 """Check to see if the search keys for all entries are the same. 943 944 When using a hash as the search_key it is possible for non-identical 945 keys to collide. If that happens enough, we may try overflow a 946 LeafNode, but as all are collisions, we must not split. 947 """ 948 common_search_key = None 949 for key in self._items: 950 search_key = self._search_key(key) 951 if common_search_key is None: 952 common_search_key = search_key 953 elif search_key != common_search_key: 954 return False 955 return True 956 957 def _compute_serialised_prefix(self): 958 """Determine the common prefix for serialised keys in this node. 959 960 :return: A bytestring of the longest serialised key prefix that is 961 unique within this node. 962 """ 963 serialised_keys = [self._serialise_key(key) for key in self._items] 964 self._common_serialised_prefix = self.common_prefix_for_keys( 965 serialised_keys) 966 return self._common_serialised_prefix 967 968 def unmap(self, store, key): 969 """Unmap key from the node.""" 970 try: 971 self._raw_size -= self._key_value_len(key, self._items[key]) 972 except KeyError: 973 trace.mutter("key %s not found in %r", key, self._items) 974 raise 975 self._len -= 1 976 del self._items[key] 977 self._key = None 978 # Recompute from scratch 979 self._compute_search_prefix() 980 self._compute_serialised_prefix() 981 return self 982 983 984class InternalNode(Node): 985 """A node that contains references to other nodes. 986 987 An InternalNode is responsible for mapping search key prefixes to child 988 nodes. 989 990 :ivar _items: serialised_key => node dictionary. node may be a tuple, 991 LeafNode or InternalNode. 992 """ 993 994 __slots__ = ('_node_width',) 995 996 def __init__(self, prefix=b'', search_key_func=None): 997 Node.__init__(self) 998 # The size of an internalnode with default values and no children. 999 # How many octets key prefixes within this node are. 1000 self._node_width = 0 1001 self._search_prefix = prefix 1002 if search_key_func is None: 1003 self._search_key_func = _search_key_plain 1004 else: 1005 self._search_key_func = search_key_func 1006 1007 def add_node(self, prefix, node): 1008 """Add a child node with prefix prefix, and node node. 1009 1010 :param prefix: The search key prefix for node. 1011 :param node: The node being added. 1012 """ 1013 if self._search_prefix is None: 1014 raise AssertionError("_search_prefix should not be None") 1015 if not prefix.startswith(self._search_prefix): 1016 raise AssertionError("prefixes mismatch: %s must start with %s" 1017 % (prefix, self._search_prefix)) 1018 if len(prefix) != len(self._search_prefix) + 1: 1019 raise AssertionError("prefix wrong length: len(%s) is not %d" % 1020 (prefix, len(self._search_prefix) + 1)) 1021 self._len += len(node) 1022 if not len(self._items): 1023 self._node_width = len(prefix) 1024 if self._node_width != len(self._search_prefix) + 1: 1025 raise AssertionError("node width mismatch: %d is not %d" % 1026 (self._node_width, len(self._search_prefix) + 1)) 1027 self._items[prefix] = node 1028 self._key = None 1029 1030 def _current_size(self): 1031 """Answer the current serialised size of this node.""" 1032 return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) 1033 + len(str(self._maximum_size))) 1034 1035 @classmethod 1036 def deserialise(klass, bytes, key, search_key_func=None): 1037 """Deserialise bytes to an InternalNode, with key key. 1038 1039 :param bytes: The bytes of the node. 1040 :param key: The key that the serialised node has. 1041 :return: An InternalNode instance. 1042 """ 1043 key = static_tuple.expect_static_tuple(key) 1044 return _deserialise_internal_node(bytes, key, 1045 search_key_func=search_key_func) 1046 1047 def iteritems(self, store, key_filter=None): 1048 for node, node_filter in self._iter_nodes(store, key_filter=key_filter): 1049 for item in node.iteritems(store, key_filter=node_filter): 1050 yield item 1051 1052 def _iter_nodes(self, store, key_filter=None, batch_size=None): 1053 """Iterate over node objects which match key_filter. 1054 1055 :param store: A store to use for accessing content. 1056 :param key_filter: A key filter to filter nodes. Only nodes that might 1057 contain a key in key_filter will be returned. 1058 :param batch_size: If not None, then we will return the nodes that had 1059 to be read using get_record_stream in batches, rather than reading 1060 them all at once. 1061 :return: An iterable of nodes. This function does not have to be fully 1062 consumed. (There will be no pending I/O when items are being returned.) 1063 """ 1064 # Map from chk key ('sha1:...',) to (prefix, key_filter) 1065 # prefix is the key in self._items to use, key_filter is the key_filter 1066 # entries that would match this node 1067 keys = {} 1068 shortcut = False 1069 if key_filter is None: 1070 # yielding all nodes, yield whatever we have, and queue up a read 1071 # for whatever we are missing 1072 shortcut = True 1073 for prefix, node in self._items.items(): 1074 if node.__class__ is StaticTuple: 1075 keys[node] = (prefix, None) 1076 else: 1077 yield node, None 1078 elif len(key_filter) == 1: 1079 # Technically, this path could also be handled by the first check 1080 # in 'self._node_width' in length_filters. However, we can handle 1081 # this case without spending any time building up the 1082 # prefix_to_keys, etc state. 1083 1084 # This is a bit ugly, but TIMEIT showed it to be by far the fastest 1085 # 0.626us list(key_filter)[0] 1086 # is a func() for list(), 2 mallocs, and a getitem 1087 # 0.489us [k for k in key_filter][0] 1088 # still has the mallocs, avoids the func() call 1089 # 0.350us iter(key_filter).next() 1090 # has a func() call, and mallocs an iterator 1091 # 0.125us for key in key_filter: pass 1092 # no func() overhead, might malloc an iterator 1093 # 0.105us for key in key_filter: break 1094 # no func() overhead, might malloc an iterator, probably 1095 # avoids checking an 'else' clause as part of the for 1096 for key in key_filter: 1097 break 1098 search_prefix = self._search_prefix_filter(key) 1099 if len(search_prefix) == self._node_width: 1100 # This item will match exactly, so just do a dict lookup, and 1101 # see what we can return 1102 shortcut = True 1103 try: 1104 node = self._items[search_prefix] 1105 except KeyError: 1106 # A given key can only match 1 child node, if it isn't 1107 # there, then we can just return nothing 1108 return 1109 if node.__class__ is StaticTuple: 1110 keys[node] = (search_prefix, [key]) 1111 else: 1112 # This is loaded, and the only thing that can match, 1113 # return 1114 yield node, [key] 1115 return 1116 if not shortcut: 1117 # First, convert all keys into a list of search prefixes 1118 # Aggregate common prefixes, and track the keys they come from 1119 prefix_to_keys = {} 1120 length_filters = {} 1121 for key in key_filter: 1122 search_prefix = self._search_prefix_filter(key) 1123 length_filter = length_filters.setdefault( 1124 len(search_prefix), set()) 1125 length_filter.add(search_prefix) 1126 prefix_to_keys.setdefault(search_prefix, []).append(key) 1127 1128 if (self._node_width in length_filters and 1129 len(length_filters) == 1): 1130 # all of the search prefixes match exactly _node_width. This 1131 # means that everything is an exact match, and we can do a 1132 # lookup into self._items, rather than iterating over the items 1133 # dict. 1134 search_prefixes = length_filters[self._node_width] 1135 for search_prefix in search_prefixes: 1136 try: 1137 node = self._items[search_prefix] 1138 except KeyError: 1139 # We can ignore this one 1140 continue 1141 node_key_filter = prefix_to_keys[search_prefix] 1142 if node.__class__ is StaticTuple: 1143 keys[node] = (search_prefix, node_key_filter) 1144 else: 1145 yield node, node_key_filter 1146 else: 1147 # The slow way. We walk every item in self._items, and check to 1148 # see if there are any matches 1149 length_filters_itemview = length_filters.items() 1150 for prefix, node in self._items.items(): 1151 node_key_filter = [] 1152 for length, length_filter in length_filters_itemview: 1153 sub_prefix = prefix[:length] 1154 if sub_prefix in length_filter: 1155 node_key_filter.extend(prefix_to_keys[sub_prefix]) 1156 if node_key_filter: # this key matched something, yield it 1157 if node.__class__ is StaticTuple: 1158 keys[node] = (prefix, node_key_filter) 1159 else: 1160 yield node, node_key_filter 1161 if keys: 1162 # Look in the page cache for some more bytes 1163 found_keys = set() 1164 for key in keys: 1165 try: 1166 bytes = _get_cache()[key] 1167 except KeyError: 1168 continue 1169 else: 1170 node = _deserialise(bytes, key, 1171 search_key_func=self._search_key_func) 1172 prefix, node_key_filter = keys[key] 1173 self._items[prefix] = node 1174 found_keys.add(key) 1175 yield node, node_key_filter 1176 for key in found_keys: 1177 del keys[key] 1178 if keys: 1179 # demand load some pages. 1180 if batch_size is None: 1181 # Read all the keys in 1182 batch_size = len(keys) 1183 key_order = list(keys) 1184 for batch_start in range(0, len(key_order), batch_size): 1185 batch = key_order[batch_start:batch_start + batch_size] 1186 # We have to fully consume the stream so there is no pending 1187 # I/O, so we buffer the nodes for now. 1188 stream = store.get_record_stream(batch, 'unordered', True) 1189 node_and_filters = [] 1190 for record in stream: 1191 bytes = record.get_bytes_as('fulltext') 1192 node = _deserialise(bytes, record.key, 1193 search_key_func=self._search_key_func) 1194 prefix, node_key_filter = keys[record.key] 1195 node_and_filters.append((node, node_key_filter)) 1196 self._items[prefix] = node 1197 _get_cache()[record.key] = bytes 1198 for info in node_and_filters: 1199 yield info 1200 1201 def map(self, store, key, value): 1202 """Map key to value.""" 1203 if not len(self._items): 1204 raise AssertionError("can't map in an empty InternalNode.") 1205 search_key = self._search_key(key) 1206 if self._node_width != len(self._search_prefix) + 1: 1207 raise AssertionError("node width mismatch: %d is not %d" % 1208 (self._node_width, len(self._search_prefix) + 1)) 1209 if not search_key.startswith(self._search_prefix): 1210 # This key doesn't fit in this index, so we need to split at the 1211 # point where it would fit, insert self into that internal node, 1212 # and then map this key into that node. 1213 new_prefix = self.common_prefix(self._search_prefix, 1214 search_key) 1215 new_parent = InternalNode(new_prefix, 1216 search_key_func=self._search_key_func) 1217 new_parent.set_maximum_size(self._maximum_size) 1218 new_parent._key_width = self._key_width 1219 new_parent.add_node(self._search_prefix[:len(new_prefix) + 1], 1220 self) 1221 return new_parent.map(store, key, value) 1222 children = [node for node, _ in self._iter_nodes( 1223 store, key_filter=[key])] 1224 if children: 1225 child = children[0] 1226 else: 1227 # new child needed: 1228 child = self._new_child(search_key, LeafNode) 1229 old_len = len(child) 1230 if isinstance(child, LeafNode): 1231 old_size = child._current_size() 1232 else: 1233 old_size = None 1234 prefix, node_details = child.map(store, key, value) 1235 if len(node_details) == 1: 1236 # child may have shrunk, or might be a new node 1237 child = node_details[0][1] 1238 self._len = self._len - old_len + len(child) 1239 self._items[search_key] = child 1240 self._key = None 1241 new_node = self 1242 if isinstance(child, LeafNode): 1243 if old_size is None: 1244 # The old node was an InternalNode which means it has now 1245 # collapsed, so we need to check if it will chain to a 1246 # collapse at this level. 1247 trace.mutter("checking remap as InternalNode -> LeafNode") 1248 new_node = self._check_remap(store) 1249 else: 1250 # If the LeafNode has shrunk in size, we may want to run 1251 # a remap check. Checking for a remap is expensive though 1252 # and the frequency of a successful remap is very low. 1253 # Shrinkage by small amounts is common, so we only do the 1254 # remap check if the new_size is low or the shrinkage 1255 # amount is over a configurable limit. 1256 new_size = child._current_size() 1257 shrinkage = old_size - new_size 1258 if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE or 1259 shrinkage > _INTERESTING_SHRINKAGE_LIMIT): 1260 trace.mutter( 1261 "checking remap as size shrunk by %d to be %d", 1262 shrinkage, new_size) 1263 new_node = self._check_remap(store) 1264 if new_node._search_prefix is None: 1265 raise AssertionError("_search_prefix should not be None") 1266 return new_node._search_prefix, [(b'', new_node)] 1267 # child has overflown - create a new intermediate node. 1268 # XXX: This is where we might want to try and expand our depth 1269 # to refer to more bytes of every child (which would give us 1270 # multiple pointers to child nodes, but less intermediate nodes) 1271 child = self._new_child(search_key, InternalNode) 1272 child._search_prefix = prefix 1273 for split, node in node_details: 1274 child.add_node(split, node) 1275 self._len = self._len - old_len + len(child) 1276 self._key = None 1277 return self._search_prefix, [(b"", self)] 1278 1279 def _new_child(self, search_key, klass): 1280 """Create a new child node of type klass.""" 1281 child = klass() 1282 child.set_maximum_size(self._maximum_size) 1283 child._key_width = self._key_width 1284 child._search_key_func = self._search_key_func 1285 self._items[search_key] = child 1286 return child 1287 1288 def serialise(self, store): 1289 """Serialise the node to store. 1290 1291 :param store: A VersionedFiles honouring the CHK extensions. 1292 :return: An iterable of the keys inserted by this operation. 1293 """ 1294 for node in self._items.values(): 1295 if isinstance(node, StaticTuple): 1296 # Never deserialised. 1297 continue 1298 if node._key is not None: 1299 # Never altered 1300 continue 1301 for key in node.serialise(store): 1302 yield key 1303 lines = [b"chknode:\n"] 1304 lines.append(b"%d\n" % self._maximum_size) 1305 lines.append(b"%d\n" % self._key_width) 1306 lines.append(b"%d\n" % self._len) 1307 if self._search_prefix is None: 1308 raise AssertionError("_search_prefix should not be None") 1309 lines.append(b'%s\n' % (self._search_prefix,)) 1310 prefix_len = len(self._search_prefix) 1311 for prefix, node in sorted(self._items.items()): 1312 if isinstance(node, StaticTuple): 1313 key = node[0] 1314 else: 1315 key = node._key[0] 1316 serialised = b"%s\x00%s\n" % (prefix, key) 1317 if not serialised.startswith(self._search_prefix): 1318 raise AssertionError("prefixes mismatch: %s must start with %s" 1319 % (serialised, self._search_prefix)) 1320 lines.append(serialised[prefix_len:]) 1321 sha1, _, _ = store.add_lines((None,), (), lines) 1322 self._key = StaticTuple(b"sha1:" + sha1,).intern() 1323 _get_cache()[self._key] = b''.join(lines) 1324 yield self._key 1325 1326 def _search_key(self, key): 1327 """Return the serialised key for key in this node.""" 1328 # search keys are fixed width. All will be self._node_width wide, so we 1329 # pad as necessary. 1330 return (self._search_key_func(key) + b'\x00' * self._node_width)[:self._node_width] 1331 1332 def _search_prefix_filter(self, key): 1333 """Serialise key for use as a prefix filter in iteritems.""" 1334 return self._search_key_func(key)[:self._node_width] 1335 1336 def _split(self, offset): 1337 """Split this node into smaller nodes starting at offset. 1338 1339 :param offset: The offset to start the new child nodes at. 1340 :return: An iterable of (prefix, node) tuples. prefix is a byte 1341 prefix for reaching node. 1342 """ 1343 if offset >= self._node_width: 1344 for node in valueview(self._items): 1345 for result in node._split(offset): 1346 yield result 1347 1348 def refs(self): 1349 """Return the references to other CHK's held by this node.""" 1350 if self._key is None: 1351 raise AssertionError("unserialised nodes have no refs.") 1352 refs = [] 1353 for value in self._items.values(): 1354 if isinstance(value, StaticTuple): 1355 refs.append(value) 1356 else: 1357 refs.append(value.key()) 1358 return refs 1359 1360 def _compute_search_prefix(self, extra_key=None): 1361 """Return the unique key prefix for this node. 1362 1363 :return: A bytestring of the longest search key prefix that is 1364 unique within this node. 1365 """ 1366 self._search_prefix = self.common_prefix_for_keys(self._items) 1367 return self._search_prefix 1368 1369 def unmap(self, store, key, check_remap=True): 1370 """Remove key from this node and its children.""" 1371 if not len(self._items): 1372 raise AssertionError("can't unmap in an empty InternalNode.") 1373 children = [node for node, _ 1374 in self._iter_nodes(store, key_filter=[key])] 1375 if children: 1376 child = children[0] 1377 else: 1378 raise KeyError(key) 1379 self._len -= 1 1380 unmapped = child.unmap(store, key) 1381 self._key = None 1382 search_key = self._search_key(key) 1383 if len(unmapped) == 0: 1384 # All child nodes are gone, remove the child: 1385 del self._items[search_key] 1386 unmapped = None 1387 else: 1388 # Stash the returned node 1389 self._items[search_key] = unmapped 1390 if len(self._items) == 1: 1391 # this node is no longer needed: 1392 return list(self._items.values())[0] 1393 if isinstance(unmapped, InternalNode): 1394 return self 1395 if check_remap: 1396 return self._check_remap(store) 1397 else: 1398 return self 1399 1400 def _check_remap(self, store): 1401 """Check if all keys contained by children fit in a single LeafNode. 1402 1403 :param store: A store to use for reading more nodes 1404 :return: Either self, or a new LeafNode which should replace self. 1405 """ 1406 # Logic for how we determine when we need to rebuild 1407 # 1) Implicitly unmap() is removing a key which means that the child 1408 # nodes are going to be shrinking by some extent. 1409 # 2) If all children are LeafNodes, it is possible that they could be 1410 # combined into a single LeafNode, which can then completely replace 1411 # this internal node with a single LeafNode 1412 # 3) If *one* child is an InternalNode, we assume it has already done 1413 # all the work to determine that its children cannot collapse, and 1414 # we can then assume that those nodes *plus* the current nodes don't 1415 # have a chance of collapsing either. 1416 # So a very cheap check is to just say if 'unmapped' is an 1417 # InternalNode, we don't have to check further. 1418 1419 # TODO: Another alternative is to check the total size of all known 1420 # LeafNodes. If there is some formula we can use to determine the 1421 # final size without actually having to read in any more 1422 # children, it would be nice to have. However, we have to be 1423 # careful with stuff like nodes that pull out the common prefix 1424 # of each key, as adding a new key can change the common prefix 1425 # and cause size changes greater than the length of one key. 1426 # So for now, we just add everything to a new Leaf until it 1427 # splits, as we know that will give the right answer 1428 new_leaf = LeafNode(search_key_func=self._search_key_func) 1429 new_leaf.set_maximum_size(self._maximum_size) 1430 new_leaf._key_width = self._key_width 1431 # A batch_size of 16 was chosen because: 1432 # a) In testing, a 4k page held 14 times. So if we have more than 16 1433 # leaf nodes we are unlikely to hold them in a single new leaf 1434 # node. This still allows for 1 round trip 1435 # b) With 16-way fan out, we can still do a single round trip 1436 # c) With 255-way fan out, we don't want to read all 255 and destroy 1437 # the page cache, just to determine that we really don't need it. 1438 for node, _ in self._iter_nodes(store, batch_size=16): 1439 if isinstance(node, InternalNode): 1440 # Without looking at any leaf nodes, we are sure 1441 return self 1442 for key, value in node._items.items(): 1443 if new_leaf._map_no_split(key, value): 1444 return self 1445 trace.mutter("remap generated a new LeafNode") 1446 return new_leaf 1447 1448 1449def _deserialise(data, key, search_key_func): 1450 """Helper for repositorydetails - convert bytes to a node.""" 1451 if data.startswith(b"chkleaf:\n"): 1452 node = LeafNode.deserialise(data, key, search_key_func=search_key_func) 1453 elif data.startswith(b"chknode:\n"): 1454 node = InternalNode.deserialise(data, key, 1455 search_key_func=search_key_func) 1456 else: 1457 raise AssertionError("Unknown node type.") 1458 return node 1459 1460 1461class CHKMapDifference(object): 1462 """Iterate the stored pages and key,value pairs for (new - old). 1463 1464 This class provides a generator over the stored CHK pages and the 1465 (key, value) pairs that are in any of the new maps and not in any of the 1466 old maps. 1467 1468 Note that it may yield chk pages that are common (especially root nodes), 1469 but it won't yield (key,value) pairs that are common. 1470 """ 1471 1472 def __init__(self, store, new_root_keys, old_root_keys, 1473 search_key_func, pb=None): 1474 # TODO: Should we add a StaticTuple barrier here? It would be nice to 1475 # force callers to use StaticTuple, because there will often be 1476 # lots of keys passed in here. And even if we cast it locally, 1477 # that just meanst that we will have *both* a StaticTuple and a 1478 # tuple() in memory, referring to the same object. (so a net 1479 # increase in memory, not a decrease.) 1480 self._store = store 1481 self._new_root_keys = new_root_keys 1482 self._old_root_keys = old_root_keys 1483 self._pb = pb 1484 # All uninteresting chks that we have seen. By the time they are added 1485 # here, they should be either fully ignored, or queued up for 1486 # processing 1487 # TODO: This might grow to a large size if there are lots of merge 1488 # parents, etc. However, it probably doesn't scale to O(history) 1489 # like _processed_new_refs does. 1490 self._all_old_chks = set(self._old_root_keys) 1491 # All items that we have seen from the old_root_keys 1492 self._all_old_items = set() 1493 # These are interesting items which were either read, or already in the 1494 # interesting queue (so we don't need to walk them again) 1495 # TODO: processed_new_refs becomes O(all_chks), consider switching to 1496 # SimpleSet here. 1497 self._processed_new_refs = set() 1498 self._search_key_func = search_key_func 1499 1500 # The uninteresting and interesting nodes to be searched 1501 self._old_queue = [] 1502 self._new_queue = [] 1503 # Holds the (key, value) items found when processing the root nodes, 1504 # waiting for the uninteresting nodes to be walked 1505 self._new_item_queue = [] 1506 self._state = None 1507 1508 def _read_nodes_from_store(self, keys): 1509 # We chose not to use _get_cache(), because we think in 1510 # terms of records to be yielded. Also, we expect to touch each page 1511 # only 1 time during this code. (We may want to evaluate saving the 1512 # raw bytes into the page cache, which would allow a working tree 1513 # update after the fetch to not have to read the bytes again.) 1514 as_st = StaticTuple.from_sequence 1515 stream = self._store.get_record_stream(keys, 'unordered', True) 1516 for record in stream: 1517 if self._pb is not None: 1518 self._pb.tick() 1519 if record.storage_kind == 'absent': 1520 raise errors.NoSuchRevision(self._store, record.key) 1521 bytes = record.get_bytes_as('fulltext') 1522 node = _deserialise(bytes, record.key, 1523 search_key_func=self._search_key_func) 1524 if isinstance(node, InternalNode): 1525 # Note we don't have to do node.refs() because we know that 1526 # there are no children that have been pushed into this node 1527 # Note: Using as_st() here seemed to save 1.2MB, which would 1528 # indicate that we keep 100k prefix_refs around while 1529 # processing. They *should* be shorter lived than that... 1530 # It does cost us ~10s of processing time 1531 prefix_refs = list(node._items.items()) 1532 items = [] 1533 else: 1534 prefix_refs = [] 1535 # Note: We don't use a StaticTuple here. Profiling showed a 1536 # minor memory improvement (0.8MB out of 335MB peak 0.2%) 1537 # But a significant slowdown (15s / 145s, or 10%) 1538 items = list(node._items.items()) 1539 yield record, node, prefix_refs, items 1540 1541 def _read_old_roots(self): 1542 old_chks_to_enqueue = [] 1543 all_old_chks = self._all_old_chks 1544 for record, node, prefix_refs, items in \ 1545 self._read_nodes_from_store(self._old_root_keys): 1546 # Uninteresting node 1547 prefix_refs = [p_r for p_r in prefix_refs 1548 if p_r[1] not in all_old_chks] 1549 new_refs = [p_r[1] for p_r in prefix_refs] 1550 all_old_chks.update(new_refs) 1551 # TODO: This might be a good time to turn items into StaticTuple 1552 # instances and possibly intern them. However, this does not 1553 # impact 'initial branch' performance, so I'm not worrying 1554 # about this yet 1555 self._all_old_items.update(items) 1556 # Queue up the uninteresting references 1557 # Don't actually put them in the 'to-read' queue until we have 1558 # finished checking the interesting references 1559 old_chks_to_enqueue.extend(prefix_refs) 1560 return old_chks_to_enqueue 1561 1562 def _enqueue_old(self, new_prefixes, old_chks_to_enqueue): 1563 # At this point, we have read all the uninteresting and interesting 1564 # items, so we can queue up the uninteresting stuff, knowing that we've 1565 # handled the interesting ones 1566 for prefix, ref in old_chks_to_enqueue: 1567 not_interesting = True 1568 for i in range(len(prefix), 0, -1): 1569 if prefix[:i] in new_prefixes: 1570 not_interesting = False 1571 break 1572 if not_interesting: 1573 # This prefix is not part of the remaining 'interesting set' 1574 continue 1575 self._old_queue.append(ref) 1576 1577 def _read_all_roots(self): 1578 """Read the root pages. 1579 1580 This is structured as a generator, so that the root records can be 1581 yielded up to whoever needs them without any buffering. 1582 """ 1583 # This is the bootstrap phase 1584 if not self._old_root_keys: 1585 # With no old_root_keys we can just shortcut and be ready 1586 # for _flush_new_queue 1587 self._new_queue = list(self._new_root_keys) 1588 return 1589 old_chks_to_enqueue = self._read_old_roots() 1590 # filter out any root keys that are already known to be uninteresting 1591 new_keys = set(self._new_root_keys).difference(self._all_old_chks) 1592 # These are prefixes that are present in new_keys that we are 1593 # thinking to yield 1594 new_prefixes = set() 1595 # We are about to yield all of these, so we don't want them getting 1596 # added a second time 1597 processed_new_refs = self._processed_new_refs 1598 processed_new_refs.update(new_keys) 1599 for record, node, prefix_refs, items in \ 1600 self._read_nodes_from_store(new_keys): 1601 # At this level, we now know all the uninteresting references 1602 # So we filter and queue up whatever is remaining 1603 prefix_refs = [p_r for p_r in prefix_refs 1604 if p_r[1] not in self._all_old_chks and 1605 p_r[1] not in processed_new_refs] 1606 refs = [p_r[1] for p_r in prefix_refs] 1607 new_prefixes.update([p_r[0] for p_r in prefix_refs]) 1608 self._new_queue.extend(refs) 1609 # TODO: We can potentially get multiple items here, however the 1610 # current design allows for this, as callers will do the work 1611 # to make the results unique. We might profile whether we 1612 # gain anything by ensuring unique return values for items 1613 # TODO: This might be a good time to cast to StaticTuple, as 1614 # self._new_item_queue will hold the contents of multiple 1615 # records for an extended lifetime 1616 new_items = [item for item in items 1617 if item not in self._all_old_items] 1618 self._new_item_queue.extend(new_items) 1619 new_prefixes.update([self._search_key_func(item[0]) 1620 for item in new_items]) 1621 processed_new_refs.update(refs) 1622 yield record 1623 # For new_prefixes we have the full length prefixes queued up. 1624 # However, we also need possible prefixes. (If we have a known ref to 1625 # 'ab', then we also need to include 'a'.) So expand the 1626 # new_prefixes to include all shorter prefixes 1627 for prefix in list(new_prefixes): 1628 new_prefixes.update([prefix[:i] for i in range(1, len(prefix))]) 1629 self._enqueue_old(new_prefixes, old_chks_to_enqueue) 1630 1631 def _flush_new_queue(self): 1632 # No need to maintain the heap invariant anymore, just pull things out 1633 # and process them 1634 refs = set(self._new_queue) 1635 self._new_queue = [] 1636 # First pass, flush all interesting items and convert to using direct refs 1637 all_old_chks = self._all_old_chks 1638 processed_new_refs = self._processed_new_refs 1639 all_old_items = self._all_old_items 1640 new_items = [item for item in self._new_item_queue 1641 if item not in all_old_items] 1642 self._new_item_queue = [] 1643 if new_items: 1644 yield None, new_items 1645 refs = refs.difference(all_old_chks) 1646 processed_new_refs.update(refs) 1647 while refs: 1648 # TODO: Using a SimpleSet for self._processed_new_refs and 1649 # saved as much as 10MB of peak memory. However, it requires 1650 # implementing a non-pyrex version. 1651 next_refs = set() 1652 next_refs_update = next_refs.update 1653 # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev' 1654 # from 1m54s to 1m51s. Consider it. 1655 for record, _, p_refs, items in self._read_nodes_from_store(refs): 1656 if all_old_items: 1657 # using the 'if' check saves about 145s => 141s, when 1658 # streaming initial branch of Launchpad data. 1659 items = [item for item in items 1660 if item not in all_old_items] 1661 yield record, items 1662 next_refs_update([p_r[1] for p_r in p_refs]) 1663 del p_refs 1664 # set1.difference(set/dict) walks all of set1, and checks if it 1665 # exists in 'other'. 1666 # set1.difference(iterable) walks all of iterable, and does a 1667 # 'difference_update' on a clone of set1. Pick wisely based on the 1668 # expected sizes of objects. 1669 # in our case it is expected that 'new_refs' will always be quite 1670 # small. 1671 next_refs = next_refs.difference(all_old_chks) 1672 next_refs = next_refs.difference(processed_new_refs) 1673 processed_new_refs.update(next_refs) 1674 refs = next_refs 1675 1676 def _process_next_old(self): 1677 # Since we don't filter uninteresting any further than during 1678 # _read_all_roots, process the whole queue in a single pass. 1679 refs = self._old_queue 1680 self._old_queue = [] 1681 all_old_chks = self._all_old_chks 1682 for record, _, prefix_refs, items in self._read_nodes_from_store(refs): 1683 # TODO: Use StaticTuple here? 1684 self._all_old_items.update(items) 1685 refs = [r for _, r in prefix_refs if r not in all_old_chks] 1686 self._old_queue.extend(refs) 1687 all_old_chks.update(refs) 1688 1689 def _process_queues(self): 1690 while self._old_queue: 1691 self._process_next_old() 1692 return self._flush_new_queue() 1693 1694 def process(self): 1695 for record in self._read_all_roots(): 1696 yield record, [] 1697 for record, items in self._process_queues(): 1698 yield record, items 1699 1700 1701def iter_interesting_nodes(store, interesting_root_keys, 1702 uninteresting_root_keys, pb=None): 1703 """Given root keys, find interesting nodes. 1704 1705 Evaluate nodes referenced by interesting_root_keys. Ones that are also 1706 referenced from uninteresting_root_keys are not considered interesting. 1707 1708 :param interesting_root_keys: keys which should be part of the 1709 "interesting" nodes (which will be yielded) 1710 :param uninteresting_root_keys: keys which should be filtered out of the 1711 result set. 1712 :return: Yield 1713 (interesting record, {interesting key:values}) 1714 """ 1715 iterator = CHKMapDifference(store, interesting_root_keys, 1716 uninteresting_root_keys, 1717 search_key_func=store._search_key_func, 1718 pb=pb) 1719 return iterator.process() 1720 1721 1722try: 1723 from ._chk_map_pyx import ( 1724 _bytes_to_text_key, 1725 _search_key_16, 1726 _search_key_255, 1727 _deserialise_leaf_node, 1728 _deserialise_internal_node, 1729 ) 1730except ImportError as e: 1731 osutils.failed_to_load_extension(e) 1732 from ._chk_map_py import ( 1733 _bytes_to_text_key, 1734 _search_key_16, 1735 _search_key_255, 1736 _deserialise_leaf_node, 1737 _deserialise_internal_node, 1738 ) # noqa: F401 1739search_key_registry.register(b'hash-16-way', _search_key_16) 1740search_key_registry.register(b'hash-255-way', _search_key_255) 1741 1742 1743def _check_key(key): 1744 """Helper function to assert that a key is properly formatted. 1745 1746 This generally shouldn't be used in production code, but it can be helpful 1747 to debug problems. 1748 """ 1749 if not isinstance(key, StaticTuple): 1750 raise TypeError('key %r is not StaticTuple but %s' % (key, type(key))) 1751 if len(key) != 1: 1752 raise ValueError('key %r should have length 1, not %d' % 1753 (key, len(key),)) 1754 if not isinstance(key[0], str): 1755 raise TypeError('key %r should hold a str, not %r' 1756 % (key, type(key[0]))) 1757 if not key[0].startswith('sha1:'): 1758 raise ValueError('key %r should point to a sha1:' % (key,)) 1759