1# Copyright (C) 2008-2011 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16# 17 18"""B+Tree indices""" 19 20from io import BytesIO 21 22from ..lazy_import import lazy_import 23lazy_import(globals(), """ 24import bisect 25import math 26import tempfile 27import zlib 28""") 29 30from .. import ( 31 chunk_writer, 32 debug, 33 fifo_cache, 34 lru_cache, 35 osutils, 36 static_tuple, 37 trace, 38 transport, 39 ) 40from . import ( 41 index, 42 ) 43from .index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN 44 45 46_BTSIGNATURE = b"B+Tree Graph Index 2\n" 47_OPTION_ROW_LENGTHS = b"row_lengths=" 48_LEAF_FLAG = b"type=leaf\n" 49_INTERNAL_FLAG = b"type=internal\n" 50_INTERNAL_OFFSET = b"offset=" 51 52_RESERVED_HEADER_BYTES = 120 53_PAGE_SIZE = 4096 54 55# 4K per page: 4MB - 1000 entries 56_NODE_CACHE_SIZE = 1000 57 58 59class _BuilderRow(object): 60 """The stored state accumulated while writing out a row in the index. 61 62 :ivar spool: A temporary file used to accumulate nodes for this row 63 in the tree. 64 :ivar nodes: The count of nodes emitted so far. 65 """ 66 67 def __init__(self): 68 """Create a _BuilderRow.""" 69 self.nodes = 0 70 self.spool = None # tempfile.TemporaryFile(prefix='bzr-index-row-') 71 self.writer = None 72 73 def finish_node(self, pad=True): 74 byte_lines, _, padding = self.writer.finish() 75 if self.nodes == 0: 76 self.spool = BytesIO() 77 # padded note: 78 self.spool.write(b"\x00" * _RESERVED_HEADER_BYTES) 79 elif self.nodes == 1: 80 # We got bigger than 1 node, switch to a temp file 81 spool = tempfile.TemporaryFile(prefix='bzr-index-row-') 82 spool.write(self.spool.getvalue()) 83 self.spool = spool 84 skipped_bytes = 0 85 if not pad and padding: 86 del byte_lines[-1] 87 skipped_bytes = padding 88 self.spool.writelines(byte_lines) 89 remainder = (self.spool.tell() + skipped_bytes) % _PAGE_SIZE 90 if remainder != 0: 91 raise AssertionError("incorrect node length: %d, %d" 92 % (self.spool.tell(), remainder)) 93 self.nodes += 1 94 self.writer = None 95 96 97class _InternalBuilderRow(_BuilderRow): 98 """The stored state accumulated while writing out internal rows.""" 99 100 def finish_node(self, pad=True): 101 if not pad: 102 raise AssertionError("Must pad internal nodes only.") 103 _BuilderRow.finish_node(self) 104 105 106class _LeafBuilderRow(_BuilderRow): 107 """The stored state accumulated while writing out a leaf rows.""" 108 109 110class BTreeBuilder(index.GraphIndexBuilder): 111 """A Builder for B+Tree based Graph indices. 112 113 The resulting graph has the structure: 114 115 _SIGNATURE OPTIONS NODES 116 _SIGNATURE := 'B+Tree Graph Index 1' NEWLINE 117 OPTIONS := REF_LISTS KEY_ELEMENTS LENGTH 118 REF_LISTS := 'node_ref_lists=' DIGITS NEWLINE 119 KEY_ELEMENTS := 'key_elements=' DIGITS NEWLINE 120 LENGTH := 'len=' DIGITS NEWLINE 121 ROW_LENGTHS := 'row_lengths' DIGITS (COMMA DIGITS)* 122 NODES := NODE_COMPRESSED* 123 NODE_COMPRESSED:= COMPRESSED_BYTES{4096} 124 NODE_RAW := INTERNAL | LEAF 125 INTERNAL := INTERNAL_FLAG POINTERS 126 LEAF := LEAF_FLAG ROWS 127 KEY_ELEMENT := Not-whitespace-utf8 128 KEY := KEY_ELEMENT (NULL KEY_ELEMENT)* 129 ROWS := ROW* 130 ROW := KEY NULL ABSENT? NULL REFERENCES NULL VALUE NEWLINE 131 ABSENT := 'a' 132 REFERENCES := REFERENCE_LIST (TAB REFERENCE_LIST){node_ref_lists - 1} 133 REFERENCE_LIST := (REFERENCE (CR REFERENCE)*)? 134 REFERENCE := KEY 135 VALUE := no-newline-no-null-bytes 136 """ 137 138 def __init__(self, reference_lists=0, key_elements=1, spill_at=100000): 139 """See GraphIndexBuilder.__init__. 140 141 :param spill_at: Optional parameter controlling the maximum number 142 of nodes that BTreeBuilder will hold in memory. 143 """ 144 index.GraphIndexBuilder.__init__(self, reference_lists=reference_lists, 145 key_elements=key_elements) 146 self._spill_at = spill_at 147 self._backing_indices = [] 148 # A map of {key: (node_refs, value)} 149 self._nodes = {} 150 # Indicate it hasn't been built yet 151 self._nodes_by_key = None 152 self._optimize_for_size = False 153 154 def add_node(self, key, value, references=()): 155 """Add a node to the index. 156 157 If adding the node causes the builder to reach its spill_at threshold, 158 disk spilling will be triggered. 159 160 :param key: The key. keys are non-empty tuples containing 161 as many whitespace-free utf8 bytestrings as the key length 162 defined for this index. 163 :param references: An iterable of iterables of keys. Each is a 164 reference to another key. 165 :param value: The value to associate with the key. It may be any 166 bytes as long as it does not contain \\0 or \\n. 167 """ 168 # Ensure that 'key' is a StaticTuple 169 key = static_tuple.StaticTuple.from_sequence(key).intern() 170 # we don't care about absent_references 171 node_refs, _ = self._check_key_ref_value(key, references, value) 172 if key in self._nodes: 173 raise index.BadIndexDuplicateKey(key, self) 174 self._nodes[key] = static_tuple.StaticTuple(node_refs, value) 175 if self._nodes_by_key is not None and self._key_length > 1: 176 self._update_nodes_by_key(key, value, node_refs) 177 if len(self._nodes) < self._spill_at: 178 return 179 self._spill_mem_keys_to_disk() 180 181 def _spill_mem_keys_to_disk(self): 182 """Write the in memory keys down to disk to cap memory consumption. 183 184 If we already have some keys written to disk, we will combine them so 185 as to preserve the sorted order. The algorithm for combining uses 186 powers of two. So on the first spill, write all mem nodes into a 187 single index. On the second spill, combine the mem nodes with the nodes 188 on disk to create a 2x sized disk index and get rid of the first index. 189 On the third spill, create a single new disk index, which will contain 190 the mem nodes, and preserve the existing 2x sized index. On the fourth, 191 combine mem with the first and second indexes, creating a new one of 192 size 4x. On the fifth create a single new one, etc. 193 """ 194 if self._combine_backing_indices: 195 (new_backing_file, size, 196 backing_pos) = self._spill_mem_keys_and_combine() 197 else: 198 new_backing_file, size = self._spill_mem_keys_without_combining() 199 # Note: The transport here isn't strictly needed, because we will use 200 # direct access to the new_backing._file object 201 new_backing = BTreeGraphIndex(transport.get_transport_from_path('.'), 202 '<temp>', size) 203 # GC will clean up the file 204 new_backing._file = new_backing_file 205 if self._combine_backing_indices: 206 if len(self._backing_indices) == backing_pos: 207 self._backing_indices.append(None) 208 self._backing_indices[backing_pos] = new_backing 209 for backing_pos in range(backing_pos): 210 self._backing_indices[backing_pos] = None 211 else: 212 self._backing_indices.append(new_backing) 213 self._nodes = {} 214 self._nodes_by_key = None 215 216 def _spill_mem_keys_without_combining(self): 217 return self._write_nodes(self._iter_mem_nodes(), allow_optimize=False) 218 219 def _spill_mem_keys_and_combine(self): 220 iterators_to_combine = [self._iter_mem_nodes()] 221 pos = -1 222 for pos, backing in enumerate(self._backing_indices): 223 if backing is None: 224 pos -= 1 225 break 226 iterators_to_combine.append(backing.iter_all_entries()) 227 backing_pos = pos + 1 228 new_backing_file, size = \ 229 self._write_nodes(self._iter_smallest(iterators_to_combine), 230 allow_optimize=False) 231 return new_backing_file, size, backing_pos 232 233 def add_nodes(self, nodes): 234 """Add nodes to the index. 235 236 :param nodes: An iterable of (key, node_refs, value) entries to add. 237 """ 238 if self.reference_lists: 239 for (key, value, node_refs) in nodes: 240 self.add_node(key, value, node_refs) 241 else: 242 for (key, value) in nodes: 243 self.add_node(key, value) 244 245 def _iter_mem_nodes(self): 246 """Iterate over the nodes held in memory.""" 247 nodes = self._nodes 248 if self.reference_lists: 249 for key in sorted(nodes): 250 references, value = nodes[key] 251 yield self, key, value, references 252 else: 253 for key in sorted(nodes): 254 references, value = nodes[key] 255 yield self, key, value 256 257 def _iter_smallest(self, iterators_to_combine): 258 if len(iterators_to_combine) == 1: 259 for value in iterators_to_combine[0]: 260 yield value 261 return 262 current_values = [] 263 for iterator in iterators_to_combine: 264 try: 265 current_values.append(next(iterator)) 266 except StopIteration: 267 current_values.append(None) 268 last = None 269 while True: 270 # Decorate candidates with the value to allow 2.4's min to be used. 271 candidates = [(item[1][1], item) for item 272 in enumerate(current_values) if item[1] is not None] 273 if not len(candidates): 274 return 275 selected = min(candidates) 276 # undecorate back to (pos, node) 277 selected = selected[1] 278 if last == selected[1][1]: 279 raise index.BadIndexDuplicateKey(last, self) 280 last = selected[1][1] 281 # Yield, with self as the index 282 yield (self,) + selected[1][1:] 283 pos = selected[0] 284 try: 285 current_values[pos] = next(iterators_to_combine[pos]) 286 except StopIteration: 287 current_values[pos] = None 288 289 def _add_key(self, string_key, line, rows, allow_optimize=True): 290 """Add a key to the current chunk. 291 292 :param string_key: The key to add. 293 :param line: The fully serialised key and value. 294 :param allow_optimize: If set to False, prevent setting the optimize 295 flag when writing out. This is used by the _spill_mem_keys_to_disk 296 functionality. 297 """ 298 new_leaf = False 299 if rows[-1].writer is None: 300 # opening a new leaf chunk; 301 new_leaf = True 302 for pos, internal_row in enumerate(rows[:-1]): 303 # flesh out any internal nodes that are needed to 304 # preserve the height of the tree 305 if internal_row.writer is None: 306 length = _PAGE_SIZE 307 if internal_row.nodes == 0: 308 length -= _RESERVED_HEADER_BYTES # padded 309 if allow_optimize: 310 optimize_for_size = self._optimize_for_size 311 else: 312 optimize_for_size = False 313 internal_row.writer = chunk_writer.ChunkWriter( 314 length, 0, optimize_for_size=optimize_for_size) 315 internal_row.writer.write(_INTERNAL_FLAG) 316 internal_row.writer.write(_INTERNAL_OFFSET 317 + b"%d\n" % rows[pos + 1].nodes) 318 # add a new leaf 319 length = _PAGE_SIZE 320 if rows[-1].nodes == 0: 321 length -= _RESERVED_HEADER_BYTES # padded 322 rows[-1].writer = chunk_writer.ChunkWriter( 323 length, optimize_for_size=self._optimize_for_size) 324 rows[-1].writer.write(_LEAF_FLAG) 325 if rows[-1].writer.write(line): 326 # if we failed to write, despite having an empty page to write to, 327 # then line is too big. raising the error avoids infinite recursion 328 # searching for a suitably large page that will not be found. 329 if new_leaf: 330 raise index.BadIndexKey(string_key) 331 # this key did not fit in the node: 332 rows[-1].finish_node() 333 key_line = string_key + b"\n" 334 new_row = True 335 for row in reversed(rows[:-1]): 336 # Mark the start of the next node in the node above. If it 337 # doesn't fit then propagate upwards until we find one that 338 # it does fit into. 339 if row.writer.write(key_line): 340 row.finish_node() 341 else: 342 # We've found a node that can handle the pointer. 343 new_row = False 344 break 345 # If we reached the current root without being able to mark the 346 # division point, then we need a new root: 347 if new_row: 348 # We need a new row 349 if 'index' in debug.debug_flags: 350 trace.mutter('Inserting new global row.') 351 new_row = _InternalBuilderRow() 352 reserved_bytes = 0 353 rows.insert(0, new_row) 354 # This will be padded, hence the -100 355 new_row.writer = chunk_writer.ChunkWriter( 356 _PAGE_SIZE - _RESERVED_HEADER_BYTES, 357 reserved_bytes, 358 optimize_for_size=self._optimize_for_size) 359 new_row.writer.write(_INTERNAL_FLAG) 360 new_row.writer.write(_INTERNAL_OFFSET 361 + b"%d\n" % (rows[1].nodes - 1)) 362 new_row.writer.write(key_line) 363 self._add_key(string_key, line, rows, 364 allow_optimize=allow_optimize) 365 366 def _write_nodes(self, node_iterator, allow_optimize=True): 367 """Write node_iterator out as a B+Tree. 368 369 :param node_iterator: An iterator of sorted nodes. Each node should 370 match the output given by iter_all_entries. 371 :param allow_optimize: If set to False, prevent setting the optimize 372 flag when writing out. This is used by the _spill_mem_keys_to_disk 373 functionality. 374 :return: A file handle for a temporary file containing a B+Tree for 375 the nodes. 376 """ 377 # The index rows - rows[0] is the root, rows[1] is the layer under it 378 # etc. 379 rows = [] 380 # forward sorted by key. In future we may consider topological sorting, 381 # at the cost of table scans for direct lookup, or a second index for 382 # direct lookup 383 key_count = 0 384 # A stack with the number of nodes of each size. 0 is the root node 385 # and must always be 1 (if there are any nodes in the tree). 386 self.row_lengths = [] 387 # Loop over all nodes adding them to the bottom row 388 # (rows[-1]). When we finish a chunk in a row, 389 # propagate the key that didn't fit (comes after the chunk) to the 390 # row above, transitively. 391 for node in node_iterator: 392 if key_count == 0: 393 # First key triggers the first row 394 rows.append(_LeafBuilderRow()) 395 key_count += 1 396 string_key, line = _btree_serializer._flatten_node( 397 node, self.reference_lists) 398 self._add_key(string_key, line, rows, 399 allow_optimize=allow_optimize) 400 for row in reversed(rows): 401 pad = (not isinstance(row, _LeafBuilderRow)) 402 row.finish_node(pad=pad) 403 lines = [_BTSIGNATURE] 404 lines.append(b'%s%d\n' % (_OPTION_NODE_REFS, self.reference_lists)) 405 lines.append(b'%s%d\n' % (_OPTION_KEY_ELEMENTS, self._key_length)) 406 lines.append(b'%s%d\n' % (_OPTION_LEN, key_count)) 407 row_lengths = [row.nodes for row in rows] 408 lines.append(_OPTION_ROW_LENGTHS + ','.join( 409 map(str, row_lengths)).encode('ascii') + b'\n') 410 if row_lengths and row_lengths[-1] > 1: 411 result = tempfile.NamedTemporaryFile(prefix='bzr-index-') 412 else: 413 result = BytesIO() 414 result.writelines(lines) 415 position = sum(map(len, lines)) 416 if position > _RESERVED_HEADER_BYTES: 417 raise AssertionError("Could not fit the header in the" 418 " reserved space: %d > %d" 419 % (position, _RESERVED_HEADER_BYTES)) 420 # write the rows out: 421 for row in rows: 422 reserved = _RESERVED_HEADER_BYTES # reserved space for first node 423 row.spool.flush() 424 row.spool.seek(0) 425 # copy nodes to the finalised file. 426 # Special case the first node as it may be prefixed 427 node = row.spool.read(_PAGE_SIZE) 428 result.write(node[reserved:]) 429 if len(node) == _PAGE_SIZE: 430 result.write(b"\x00" * (reserved - position)) 431 position = 0 # Only the root row actually has an offset 432 copied_len = osutils.pumpfile(row.spool, result) 433 if copied_len != (row.nodes - 1) * _PAGE_SIZE: 434 if not isinstance(row, _LeafBuilderRow): 435 raise AssertionError("Incorrect amount of data copied" 436 " expected: %d, got: %d" 437 % ((row.nodes - 1) * _PAGE_SIZE, 438 copied_len)) 439 result.flush() 440 size = result.tell() 441 result.seek(0) 442 return result, size 443 444 def finish(self): 445 """Finalise the index. 446 447 :return: A file handle for a temporary file containing the nodes added 448 to the index. 449 """ 450 return self._write_nodes(self.iter_all_entries())[0] 451 452 def iter_all_entries(self): 453 """Iterate over all keys within the index 454 455 :return: An iterable of (index, key, value, reference_lists). There is 456 no defined order for the result iteration - it will be in the most 457 efficient order for the index (in this case dictionary hash order). 458 """ 459 if 'evil' in debug.debug_flags: 460 trace.mutter_callsite( 461 3, "iter_all_entries scales with size of history.") 462 # Doing serial rather than ordered would be faster; but this shouldn't 463 # be getting called routinely anyway. 464 iterators = [self._iter_mem_nodes()] 465 for backing in self._backing_indices: 466 if backing is not None: 467 iterators.append(backing.iter_all_entries()) 468 if len(iterators) == 1: 469 return iterators[0] 470 return self._iter_smallest(iterators) 471 472 def iter_entries(self, keys): 473 """Iterate over keys within the index. 474 475 :param keys: An iterable providing the keys to be retrieved. 476 :return: An iterable of (index, key, value, reference_lists). There is 477 no defined order for the result iteration - it will be in the most 478 efficient order for the index (keys iteration order in this case). 479 """ 480 keys = set(keys) 481 # Note: We don't use keys.intersection() here. If you read the C api, 482 # set.intersection(other) special cases when other is a set and 483 # will iterate the smaller of the two and lookup in the other. 484 # It does *not* do this for any other type (even dict, unlike 485 # some other set functions.) Since we expect keys is generally << 486 # self._nodes, it is faster to iterate over it in a list 487 # comprehension 488 nodes = self._nodes 489 local_keys = [key for key in keys if key in nodes] 490 if self.reference_lists: 491 for key in local_keys: 492 node = nodes[key] 493 yield self, key, node[1], node[0] 494 else: 495 for key in local_keys: 496 node = nodes[key] 497 yield self, key, node[1] 498 # Find things that are in backing indices that have not been handled 499 # yet. 500 if not self._backing_indices: 501 return # We won't find anything there either 502 # Remove all of the keys that we found locally 503 keys.difference_update(local_keys) 504 for backing in self._backing_indices: 505 if backing is None: 506 continue 507 if not keys: 508 return 509 for node in backing.iter_entries(keys): 510 keys.remove(node[1]) 511 yield (self,) + node[1:] 512 513 def iter_entries_prefix(self, keys): 514 """Iterate over keys within the index using prefix matching. 515 516 Prefix matching is applied within the tuple of a key, not to within 517 the bytestring of each key element. e.g. if you have the keys ('foo', 518 'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then 519 only the former key is returned. 520 521 :param keys: An iterable providing the key prefixes to be retrieved. 522 Each key prefix takes the form of a tuple the length of a key, but 523 with the last N elements 'None' rather than a regular bytestring. 524 The first element cannot be 'None'. 525 :return: An iterable as per iter_all_entries, but restricted to the 526 keys with a matching prefix to those supplied. No additional keys 527 will be returned, and every match that is in the index will be 528 returned. 529 """ 530 keys = set(keys) 531 if not keys: 532 return 533 for backing in self._backing_indices: 534 if backing is None: 535 continue 536 for node in backing.iter_entries_prefix(keys): 537 yield (self,) + node[1:] 538 if self._key_length == 1: 539 for key in keys: 540 index._sanity_check_key(self, key) 541 try: 542 node = self._nodes[key] 543 except KeyError: 544 continue 545 if self.reference_lists: 546 yield self, key, node[1], node[0] 547 else: 548 yield self, key, node[1] 549 return 550 nodes_by_key = self._get_nodes_by_key() 551 for entry in index._iter_entries_prefix(self, nodes_by_key, keys): 552 yield entry 553 554 def _get_nodes_by_key(self): 555 if self._nodes_by_key is None: 556 nodes_by_key = {} 557 if self.reference_lists: 558 for key, (references, value) in self._nodes.items(): 559 key_dict = nodes_by_key 560 for subkey in key[:-1]: 561 key_dict = key_dict.setdefault(subkey, {}) 562 key_dict[key[-1]] = key, value, references 563 else: 564 for key, (references, value) in self._nodes.items(): 565 key_dict = nodes_by_key 566 for subkey in key[:-1]: 567 key_dict = key_dict.setdefault(subkey, {}) 568 key_dict[key[-1]] = key, value 569 self._nodes_by_key = nodes_by_key 570 return self._nodes_by_key 571 572 def key_count(self): 573 """Return an estimate of the number of keys in this index. 574 575 For InMemoryGraphIndex the estimate is exact. 576 """ 577 return len(self._nodes) + sum( 578 backing.key_count() 579 for backing in self._backing_indices 580 if backing is not None) 581 582 def validate(self): 583 """In memory index's have no known corruption at the moment.""" 584 585 def __lt__(self, other): 586 if isinstance(other, type(self)): 587 return self._nodes < other._nodes 588 # Always sort existing indexes before ones that are still being built. 589 if isinstance(other, BTreeGraphIndex): 590 return False 591 raise TypeError 592 593 594class _LeafNode(dict): 595 """A leaf node for a serialised B+Tree index.""" 596 597 __slots__ = ('min_key', 'max_key', '_keys') 598 599 def __init__(self, bytes, key_length, ref_list_length): 600 """Parse bytes to create a leaf node object.""" 601 # splitlines mangles the \r delimiters.. don't use it. 602 key_list = _btree_serializer._parse_leaf_lines( 603 bytes, key_length, ref_list_length) 604 if key_list: 605 self.min_key = key_list[0][0] 606 self.max_key = key_list[-1][0] 607 else: 608 self.min_key = self.max_key = None 609 super(_LeafNode, self).__init__(key_list) 610 self._keys = dict(self) 611 612 def all_items(self): 613 """Return a sorted list of (key, (value, refs)) items""" 614 items = sorted(self.items()) 615 return items 616 617 def all_keys(self): 618 """Return a sorted list of all keys.""" 619 keys = sorted(self.keys()) 620 return keys 621 622 623class _InternalNode(object): 624 """An internal node for a serialised B+Tree index.""" 625 626 __slots__ = ('keys', 'offset') 627 628 def __init__(self, bytes): 629 """Parse bytes to create an internal node object.""" 630 # splitlines mangles the \r delimiters.. don't use it. 631 self.keys = self._parse_lines(bytes.split(b'\n')) 632 633 def _parse_lines(self, lines): 634 nodes = [] 635 self.offset = int(lines[1][7:]) 636 as_st = static_tuple.StaticTuple.from_sequence 637 for line in lines[2:]: 638 if line == b'': 639 break 640 # GZ 2017-05-24: Used to intern() each chunk of line as well, need 641 # to recheck performance and perhaps adapt StaticTuple to adjust. 642 nodes.append(as_st(line.split(b'\0')).intern()) 643 return nodes 644 645 646class BTreeGraphIndex(object): 647 """Access to nodes via the standard GraphIndex interface for B+Tree's. 648 649 Individual nodes are held in a LRU cache. This holds the root node in 650 memory except when very large walks are done. 651 """ 652 653 def __init__(self, transport, name, size, unlimited_cache=False, 654 offset=0): 655 """Create a B+Tree index object on the index name. 656 657 :param transport: The transport to read data for the index from. 658 :param name: The file name of the index on transport. 659 :param size: Optional size of the index in bytes. This allows 660 compatibility with the GraphIndex API, as well as ensuring that 661 the initial read (to read the root node header) can be done 662 without over-reading even on empty indices, and on small indices 663 allows single-IO to read the entire index. 664 :param unlimited_cache: If set to True, then instead of using an 665 LRUCache with size _NODE_CACHE_SIZE, we will use a dict and always 666 cache all leaf nodes. 667 :param offset: The start of the btree index data isn't byte 0 of the 668 file. Instead it starts at some point later. 669 """ 670 self._transport = transport 671 self._name = name 672 self._size = size 673 self._file = None 674 self._recommended_pages = self._compute_recommended_pages() 675 self._root_node = None 676 self._base_offset = offset 677 self._leaf_factory = _LeafNode 678 # Default max size is 100,000 leave values 679 self._leaf_value_cache = None # lru_cache.LRUCache(100*1000) 680 if unlimited_cache: 681 self._leaf_node_cache = {} 682 self._internal_node_cache = {} 683 else: 684 self._leaf_node_cache = lru_cache.LRUCache(_NODE_CACHE_SIZE) 685 # We use a FIFO here just to prevent possible blowout. However, a 686 # 300k record btree has only 3k leaf nodes, and only 20 internal 687 # nodes. A value of 100 scales to ~100*100*100 = 1M records. 688 self._internal_node_cache = fifo_cache.FIFOCache(100) 689 self._key_count = None 690 self._row_lengths = None 691 self._row_offsets = None # Start of each row, [-1] is the end 692 693 def __hash__(self): 694 return id(self) 695 696 def __eq__(self, other): 697 """Equal when self and other were created with the same parameters.""" 698 return ( 699 isinstance(self, type(other)) 700 and self._transport == other._transport 701 and self._name == other._name 702 and self._size == other._size) 703 704 def __lt__(self, other): 705 if isinstance(other, type(self)): 706 return ((self._name, self._size) < (other._name, other._size)) 707 # Always sort existing indexes before ones that are still being built. 708 if isinstance(other, BTreeBuilder): 709 return True 710 raise TypeError 711 712 def __ne__(self, other): 713 return not self.__eq__(other) 714 715 def _get_and_cache_nodes(self, nodes): 716 """Read nodes and cache them in the lru. 717 718 The nodes list supplied is sorted and then read from disk, each node 719 being inserted it into the _node_cache. 720 721 Note: Asking for more nodes than the _node_cache can contain will 722 result in some of the results being immediately discarded, to prevent 723 this an assertion is raised if more nodes are asked for than are 724 cachable. 725 726 :return: A dict of {node_pos: node} 727 """ 728 found = {} 729 start_of_leaves = None 730 for node_pos, node in self._read_nodes(sorted(nodes)): 731 if node_pos == 0: # Special case 732 self._root_node = node 733 else: 734 if start_of_leaves is None: 735 start_of_leaves = self._row_offsets[-2] 736 if node_pos < start_of_leaves: 737 self._internal_node_cache[node_pos] = node 738 else: 739 self._leaf_node_cache[node_pos] = node 740 found[node_pos] = node 741 return found 742 743 def _compute_recommended_pages(self): 744 """Convert transport's recommended_page_size into btree pages. 745 746 recommended_page_size is in bytes, we want to know how many _PAGE_SIZE 747 pages fit in that length. 748 """ 749 recommended_read = self._transport.recommended_page_size() 750 recommended_pages = int(math.ceil(recommended_read / _PAGE_SIZE)) 751 return recommended_pages 752 753 def _compute_total_pages_in_index(self): 754 """How many pages are in the index. 755 756 If we have read the header we will use the value stored there. 757 Otherwise it will be computed based on the length of the index. 758 """ 759 if self._size is None: 760 raise AssertionError('_compute_total_pages_in_index should not be' 761 ' called when self._size is None') 762 if self._root_node is not None: 763 # This is the number of pages as defined by the header 764 return self._row_offsets[-1] 765 # This is the number of pages as defined by the size of the index. They 766 # should be indentical. 767 total_pages = int(math.ceil(self._size / _PAGE_SIZE)) 768 return total_pages 769 770 def _expand_offsets(self, offsets): 771 """Find extra pages to download. 772 773 The idea is that we always want to make big-enough requests (like 64kB 774 for http), so that we don't waste round trips. So given the entries 775 that we already have cached and the new pages being downloaded figure 776 out what other pages we might want to read. 777 778 See also doc/developers/btree_index_prefetch.txt for more details. 779 780 :param offsets: The offsets to be read 781 :return: A list of offsets to download 782 """ 783 if 'index' in debug.debug_flags: 784 trace.mutter('expanding: %s\toffsets: %s', self._name, offsets) 785 786 if len(offsets) >= self._recommended_pages: 787 # Don't add more, we are already requesting more than enough 788 if 'index' in debug.debug_flags: 789 trace.mutter(' not expanding large request (%s >= %s)', 790 len(offsets), self._recommended_pages) 791 return offsets 792 if self._size is None: 793 # Don't try anything, because we don't know where the file ends 794 if 'index' in debug.debug_flags: 795 trace.mutter(' not expanding without knowing index size') 796 return offsets 797 total_pages = self._compute_total_pages_in_index() 798 cached_offsets = self._get_offsets_to_cached_pages() 799 # If reading recommended_pages would read the rest of the index, just 800 # do so. 801 if total_pages - len(cached_offsets) <= self._recommended_pages: 802 # Read whatever is left 803 if cached_offsets: 804 expanded = [x for x in range(total_pages) 805 if x not in cached_offsets] 806 else: 807 expanded = list(range(total_pages)) 808 if 'index' in debug.debug_flags: 809 trace.mutter(' reading all unread pages: %s', expanded) 810 return expanded 811 812 if self._root_node is None: 813 # ATM on the first read of the root node of a large index, we don't 814 # bother pre-reading any other pages. This is because the 815 # likelyhood of actually reading interesting pages is very low. 816 # See doc/developers/btree_index_prefetch.txt for a discussion, and 817 # a possible implementation when we are guessing that the second 818 # layer index is small 819 final_offsets = offsets 820 else: 821 tree_depth = len(self._row_lengths) 822 if len(cached_offsets) < tree_depth and len(offsets) == 1: 823 # We haven't read enough to justify expansion 824 # If we are only going to read the root node, and 1 leaf node, 825 # then it isn't worth expanding our request. Once we've read at 826 # least 2 nodes, then we are probably doing a search, and we 827 # start expanding our requests. 828 if 'index' in debug.debug_flags: 829 trace.mutter(' not expanding on first reads') 830 return offsets 831 final_offsets = self._expand_to_neighbors(offsets, cached_offsets, 832 total_pages) 833 834 final_offsets = sorted(final_offsets) 835 if 'index' in debug.debug_flags: 836 trace.mutter('expanded: %s', final_offsets) 837 return final_offsets 838 839 def _expand_to_neighbors(self, offsets, cached_offsets, total_pages): 840 """Expand requests to neighbors until we have enough pages. 841 842 This is called from _expand_offsets after policy has determined that we 843 want to expand. 844 We only want to expand requests within a given layer. We cheat a little 845 bit and assume all requests will be in the same layer. This is true 846 given the current design, but if it changes this algorithm may perform 847 oddly. 848 849 :param offsets: requested offsets 850 :param cached_offsets: offsets for pages we currently have cached 851 :return: A set() of offsets after expansion 852 """ 853 final_offsets = set(offsets) 854 first = end = None 855 new_tips = set(final_offsets) 856 while len(final_offsets) < self._recommended_pages and new_tips: 857 next_tips = set() 858 for pos in new_tips: 859 if first is None: 860 first, end = self._find_layer_first_and_end(pos) 861 previous = pos - 1 862 if (previous > 0 and 863 previous not in cached_offsets and 864 previous not in final_offsets and 865 previous >= first): 866 next_tips.add(previous) 867 after = pos + 1 868 if (after < total_pages and 869 after not in cached_offsets and 870 after not in final_offsets and 871 after < end): 872 next_tips.add(after) 873 # This would keep us from going bigger than 874 # recommended_pages by only expanding the first offsets. 875 # However, if we are making a 'wide' request, it is 876 # reasonable to expand all points equally. 877 # if len(final_offsets) > recommended_pages: 878 # break 879 final_offsets.update(next_tips) 880 new_tips = next_tips 881 return final_offsets 882 883 def clear_cache(self): 884 """Clear out any cached/memoized values. 885 886 This can be called at any time, but generally it is used when we have 887 extracted some information, but don't expect to be requesting any more 888 from this index. 889 """ 890 # Note that we don't touch self._root_node or self._internal_node_cache 891 # We don't expect either of those to be big, and it can save 892 # round-trips in the future. We may re-evaluate this if InternalNode 893 # memory starts to be an issue. 894 self._leaf_node_cache.clear() 895 896 def external_references(self, ref_list_num): 897 if self._root_node is None: 898 self._get_root_node() 899 if ref_list_num + 1 > self.node_ref_lists: 900 raise ValueError('No ref list %d, index has %d ref lists' 901 % (ref_list_num, self.node_ref_lists)) 902 keys = set() 903 refs = set() 904 for node in self.iter_all_entries(): 905 keys.add(node[1]) 906 refs.update(node[3][ref_list_num]) 907 return refs - keys 908 909 def _find_layer_first_and_end(self, offset): 910 """Find the start/stop nodes for the layer corresponding to offset. 911 912 :return: (first, end) 913 first is the first node in this layer 914 end is the first node of the next layer 915 """ 916 first = end = 0 917 for roffset in self._row_offsets: 918 first = end 919 end = roffset 920 if offset < roffset: 921 break 922 return first, end 923 924 def _get_offsets_to_cached_pages(self): 925 """Determine what nodes we already have cached.""" 926 cached_offsets = set(self._internal_node_cache) 927 # cache may be dict or LRUCache, keys() is the common method 928 cached_offsets.update(self._leaf_node_cache.keys()) 929 if self._root_node is not None: 930 cached_offsets.add(0) 931 return cached_offsets 932 933 def _get_root_node(self): 934 if self._root_node is None: 935 # We may not have a root node yet 936 self._get_internal_nodes([0]) 937 return self._root_node 938 939 def _get_nodes(self, cache, node_indexes): 940 found = {} 941 needed = [] 942 for idx in node_indexes: 943 if idx == 0 and self._root_node is not None: 944 found[0] = self._root_node 945 continue 946 try: 947 found[idx] = cache[idx] 948 except KeyError: 949 needed.append(idx) 950 if not needed: 951 return found 952 needed = self._expand_offsets(needed) 953 found.update(self._get_and_cache_nodes(needed)) 954 return found 955 956 def _get_internal_nodes(self, node_indexes): 957 """Get a node, from cache or disk. 958 959 After getting it, the node will be cached. 960 """ 961 return self._get_nodes(self._internal_node_cache, node_indexes) 962 963 def _cache_leaf_values(self, nodes): 964 """Cache directly from key => value, skipping the btree.""" 965 if self._leaf_value_cache is not None: 966 for node in nodes.values(): 967 for key, value in node.all_items(): 968 if key in self._leaf_value_cache: 969 # Don't add the rest of the keys, we've seen this node 970 # before. 971 break 972 self._leaf_value_cache[key] = value 973 974 def _get_leaf_nodes(self, node_indexes): 975 """Get a bunch of nodes, from cache or disk.""" 976 found = self._get_nodes(self._leaf_node_cache, node_indexes) 977 self._cache_leaf_values(found) 978 return found 979 980 def iter_all_entries(self): 981 """Iterate over all keys within the index. 982 983 :return: An iterable of (index, key, value) or 984 (index, key, value, reference_lists). 985 The former tuple is used when there are no reference lists in the 986 index, making the API compatible with simple key:value index types. 987 There is no defined order for the result iteration - it will be in 988 the most efficient order for the index. 989 """ 990 if 'evil' in debug.debug_flags: 991 trace.mutter_callsite( 992 3, "iter_all_entries scales with size of history.") 993 if not self.key_count(): 994 return 995 if self._row_offsets[-1] == 1: 996 # There is only the root node, and we read that via key_count() 997 if self.node_ref_lists: 998 for key, (value, refs) in self._root_node.all_items(): 999 yield (self, key, value, refs) 1000 else: 1001 for key, (value, refs) in self._root_node.all_items(): 1002 yield (self, key, value) 1003 return 1004 start_of_leaves = self._row_offsets[-2] 1005 end_of_leaves = self._row_offsets[-1] 1006 needed_offsets = list(range(start_of_leaves, end_of_leaves)) 1007 if needed_offsets == [0]: 1008 # Special case when we only have a root node, as we have already 1009 # read everything 1010 nodes = [(0, self._root_node)] 1011 else: 1012 nodes = self._read_nodes(needed_offsets) 1013 # We iterate strictly in-order so that we can use this function 1014 # for spilling index builds to disk. 1015 if self.node_ref_lists: 1016 for _, node in nodes: 1017 for key, (value, refs) in node.all_items(): 1018 yield (self, key, value, refs) 1019 else: 1020 for _, node in nodes: 1021 for key, (value, refs) in node.all_items(): 1022 yield (self, key, value) 1023 1024 @staticmethod 1025 def _multi_bisect_right(in_keys, fixed_keys): 1026 """Find the positions where each 'in_key' would fit in fixed_keys. 1027 1028 This is equivalent to doing "bisect_right" on each in_key into 1029 fixed_keys 1030 1031 :param in_keys: A sorted list of keys to match with fixed_keys 1032 :param fixed_keys: A sorted list of keys to match against 1033 :return: A list of (integer position, [key list]) tuples. 1034 """ 1035 if not in_keys: 1036 return [] 1037 if not fixed_keys: 1038 # no pointers in the fixed_keys list, which means everything must 1039 # fall to the left. 1040 return [(0, in_keys)] 1041 1042 # TODO: Iterating both lists will generally take M + N steps 1043 # Bisecting each key will generally take M * log2 N steps. 1044 # If we had an efficient way to compare, we could pick the method 1045 # based on which has the fewer number of steps. 1046 # There is also the argument that bisect_right is a compiled 1047 # function, so there is even more to be gained. 1048 # iter_steps = len(in_keys) + len(fixed_keys) 1049 # bisect_steps = len(in_keys) * math.log(len(fixed_keys), 2) 1050 if len(in_keys) == 1: # Bisect will always be faster for M = 1 1051 return [(bisect.bisect_right(fixed_keys, in_keys[0]), in_keys)] 1052 # elif bisect_steps < iter_steps: 1053 # offsets = {} 1054 # for key in in_keys: 1055 # offsets.setdefault(bisect_right(fixed_keys, key), 1056 # []).append(key) 1057 # return [(o, offsets[o]) for o in sorted(offsets)] 1058 in_keys_iter = iter(in_keys) 1059 fixed_keys_iter = enumerate(fixed_keys) 1060 cur_in_key = next(in_keys_iter) 1061 cur_fixed_offset, cur_fixed_key = next(fixed_keys_iter) 1062 1063 class InputDone(Exception): 1064 pass 1065 1066 class FixedDone(Exception): 1067 pass 1068 1069 output = [] 1070 cur_out = [] 1071 1072 # TODO: Another possibility is that rather than iterating on each side, 1073 # we could use a combination of bisecting and iterating. For 1074 # example, while cur_in_key < fixed_key, bisect to find its 1075 # point, then iterate all matching keys, then bisect (restricted 1076 # to only the remainder) for the next one, etc. 1077 try: 1078 while True: 1079 if cur_in_key < cur_fixed_key: 1080 cur_keys = [] 1081 cur_out = (cur_fixed_offset, cur_keys) 1082 output.append(cur_out) 1083 while cur_in_key < cur_fixed_key: 1084 cur_keys.append(cur_in_key) 1085 try: 1086 cur_in_key = next(in_keys_iter) 1087 except StopIteration: 1088 raise InputDone 1089 # At this point cur_in_key must be >= cur_fixed_key 1090 # step the cur_fixed_key until we pass the cur key, or walk off 1091 # the end 1092 while cur_in_key >= cur_fixed_key: 1093 try: 1094 cur_fixed_offset, cur_fixed_key = next(fixed_keys_iter) 1095 except StopIteration: 1096 raise FixedDone 1097 except InputDone: 1098 # We consumed all of the input, nothing more to do 1099 pass 1100 except FixedDone: 1101 # There was some input left, but we consumed all of fixed, so we 1102 # have to add one more for the tail 1103 cur_keys = [cur_in_key] 1104 cur_keys.extend(in_keys_iter) 1105 cur_out = (len(fixed_keys), cur_keys) 1106 output.append(cur_out) 1107 return output 1108 1109 def _walk_through_internal_nodes(self, keys): 1110 """Take the given set of keys, and find the corresponding LeafNodes. 1111 1112 :param keys: An unsorted iterable of keys to search for 1113 :return: (nodes, index_and_keys) 1114 nodes is a dict mapping {index: LeafNode} 1115 keys_at_index is a list of tuples of [(index, [keys for Leaf])] 1116 """ 1117 # 6 seconds spent in miss_torture using the sorted() line. 1118 # Even with out of order disk IO it seems faster not to sort it when 1119 # large queries are being made. 1120 keys_at_index = [(0, sorted(keys))] 1121 1122 for row_pos, next_row_start in enumerate(self._row_offsets[1:-1]): 1123 node_indexes = [idx for idx, s_keys in keys_at_index] 1124 nodes = self._get_internal_nodes(node_indexes) 1125 1126 next_nodes_and_keys = [] 1127 for node_index, sub_keys in keys_at_index: 1128 node = nodes[node_index] 1129 positions = self._multi_bisect_right(sub_keys, node.keys) 1130 node_offset = next_row_start + node.offset 1131 next_nodes_and_keys.extend([(node_offset + pos, s_keys) 1132 for pos, s_keys in positions]) 1133 keys_at_index = next_nodes_and_keys 1134 # We should now be at the _LeafNodes 1135 node_indexes = [idx for idx, s_keys in keys_at_index] 1136 1137 # TODO: We may *not* want to always read all the nodes in one 1138 # big go. Consider setting a max size on this. 1139 nodes = self._get_leaf_nodes(node_indexes) 1140 return nodes, keys_at_index 1141 1142 def iter_entries(self, keys): 1143 """Iterate over keys within the index. 1144 1145 :param keys: An iterable providing the keys to be retrieved. 1146 :return: An iterable as per iter_all_entries, but restricted to the 1147 keys supplied. No additional keys will be returned, and every 1148 key supplied that is in the index will be returned. 1149 """ 1150 # 6 seconds spent in miss_torture using the sorted() line. 1151 # Even with out of order disk IO it seems faster not to sort it when 1152 # large queries are being made. 1153 # However, now that we are doing multi-way bisecting, we need the keys 1154 # in sorted order anyway. We could change the multi-way code to not 1155 # require sorted order. (For example, it bisects for the first node, 1156 # does an in-order search until a key comes before the current point, 1157 # which it then bisects for, etc.) 1158 keys = frozenset(keys) 1159 if not keys: 1160 return 1161 1162 if not self.key_count(): 1163 return 1164 1165 needed_keys = [] 1166 if self._leaf_value_cache is None: 1167 needed_keys = keys 1168 else: 1169 for key in keys: 1170 value = self._leaf_value_cache.get(key, None) 1171 if value is not None: 1172 # This key is known not to be here, skip it 1173 value, refs = value 1174 if self.node_ref_lists: 1175 yield (self, key, value, refs) 1176 else: 1177 yield (self, key, value) 1178 else: 1179 needed_keys.append(key) 1180 1181 needed_keys = keys 1182 if not needed_keys: 1183 return 1184 nodes, nodes_and_keys = self._walk_through_internal_nodes(needed_keys) 1185 for node_index, sub_keys in nodes_and_keys: 1186 if not sub_keys: 1187 continue 1188 node = nodes[node_index] 1189 for next_sub_key in sub_keys: 1190 if next_sub_key in node: 1191 value, refs = node[next_sub_key] 1192 if self.node_ref_lists: 1193 yield (self, next_sub_key, value, refs) 1194 else: 1195 yield (self, next_sub_key, value) 1196 1197 def _find_ancestors(self, keys, ref_list_num, parent_map, missing_keys): 1198 """Find the parent_map information for the set of keys. 1199 1200 This populates the parent_map dict and missing_keys set based on the 1201 queried keys. It also can fill out an arbitrary number of parents that 1202 it finds while searching for the supplied keys. 1203 1204 It is unlikely that you want to call this directly. See 1205 "CombinedGraphIndex.find_ancestry()" for a more appropriate API. 1206 1207 :param keys: A keys whose ancestry we want to return 1208 Every key will either end up in 'parent_map' or 'missing_keys'. 1209 :param ref_list_num: This index in the ref_lists is the parents we 1210 care about. 1211 :param parent_map: {key: parent_keys} for keys that are present in this 1212 index. This may contain more entries than were in 'keys', that are 1213 reachable ancestors of the keys requested. 1214 :param missing_keys: keys which are known to be missing in this index. 1215 This may include parents that were not directly requested, but we 1216 were able to determine that they are not present in this index. 1217 :return: search_keys parents that were found but not queried to know 1218 if they are missing or present. Callers can re-query this index for 1219 those keys, and they will be placed into parent_map or missing_keys 1220 """ 1221 if not self.key_count(): 1222 # We use key_count() to trigger reading the root node and 1223 # determining info about this BTreeGraphIndex 1224 # If we don't have any keys, then everything is missing 1225 missing_keys.update(keys) 1226 return set() 1227 if ref_list_num >= self.node_ref_lists: 1228 raise ValueError('No ref list %d, index has %d ref lists' 1229 % (ref_list_num, self.node_ref_lists)) 1230 1231 # The main trick we are trying to accomplish is that when we find a 1232 # key listing its parents, we expect that the parent key is also likely 1233 # to sit on the same page. Allowing us to expand parents quickly 1234 # without suffering the full stack of bisecting, etc. 1235 nodes, nodes_and_keys = self._walk_through_internal_nodes(keys) 1236 1237 # These are parent keys which could not be immediately resolved on the 1238 # page where the child was present. Note that we may already be 1239 # searching for that key, and it may actually be present [or known 1240 # missing] on one of the other pages we are reading. 1241 # TODO: 1242 # We could try searching for them in the immediate previous or next 1243 # page. If they occur "later" we could put them in a pending lookup 1244 # set, and then for each node we read thereafter we could check to 1245 # see if they are present. 1246 # However, we don't know the impact of keeping this list of things 1247 # that I'm going to search for every node I come across from here on 1248 # out. 1249 # It doesn't handle the case when the parent key is missing on a 1250 # page that we *don't* read. So we already have to handle being 1251 # re-entrant for that. 1252 # Since most keys contain a date string, they are more likely to be 1253 # found earlier in the file than later, but we would know that right 1254 # away (key < min_key), and wouldn't keep searching it on every other 1255 # page that we read. 1256 # Mostly, it is an idea, one which should be benchmarked. 1257 parents_not_on_page = set() 1258 1259 for node_index, sub_keys in nodes_and_keys: 1260 if not sub_keys: 1261 continue 1262 # sub_keys is all of the keys we are looking for that should exist 1263 # on this page, if they aren't here, then they won't be found 1264 node = nodes[node_index] 1265 parents_to_check = set() 1266 for next_sub_key in sub_keys: 1267 if next_sub_key not in node: 1268 # This one is just not present in the index at all 1269 missing_keys.add(next_sub_key) 1270 else: 1271 value, refs = node[next_sub_key] 1272 parent_keys = refs[ref_list_num] 1273 parent_map[next_sub_key] = parent_keys 1274 parents_to_check.update(parent_keys) 1275 # Don't look for things we've already found 1276 parents_to_check = parents_to_check.difference(parent_map) 1277 # this can be used to test the benefit of having the check loop 1278 # inlined. 1279 # parents_not_on_page.update(parents_to_check) 1280 # continue 1281 while parents_to_check: 1282 next_parents_to_check = set() 1283 for key in parents_to_check: 1284 if key in node: 1285 value, refs = node[key] 1286 parent_keys = refs[ref_list_num] 1287 parent_map[key] = parent_keys 1288 next_parents_to_check.update(parent_keys) 1289 else: 1290 # This parent either is genuinely missing, or should be 1291 # found on another page. Perf test whether it is better 1292 # to check if this node should fit on this page or not. 1293 # in the 'everything-in-one-pack' scenario, this *not* 1294 # doing the check is 237ms vs 243ms. 1295 # So slightly better, but I assume the standard 'lots 1296 # of packs' is going to show a reasonable improvement 1297 # from the check, because it avoids 'going around 1298 # again' for everything that is in another index 1299 # parents_not_on_page.add(key) 1300 # Missing for some reason 1301 if key < node.min_key: 1302 # in the case of bzr.dev, 3.4k/5.3k misses are 1303 # 'earlier' misses (65%) 1304 parents_not_on_page.add(key) 1305 elif key > node.max_key: 1306 # This parent key would be present on a different 1307 # LeafNode 1308 parents_not_on_page.add(key) 1309 else: 1310 # assert (key != node.min_key and 1311 # key != node.max_key) 1312 # If it was going to be present, it would be on 1313 # *this* page, so mark it missing. 1314 missing_keys.add(key) 1315 parents_to_check = next_parents_to_check.difference(parent_map) 1316 # Might want to do another .difference() from missing_keys 1317 # parents_not_on_page could have been found on a different page, or be 1318 # known to be missing. So cull out everything that has already been 1319 # found. 1320 search_keys = parents_not_on_page.difference( 1321 parent_map).difference(missing_keys) 1322 return search_keys 1323 1324 def iter_entries_prefix(self, keys): 1325 """Iterate over keys within the index using prefix matching. 1326 1327 Prefix matching is applied within the tuple of a key, not to within 1328 the bytestring of each key element. e.g. if you have the keys ('foo', 1329 'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then 1330 only the former key is returned. 1331 1332 WARNING: Note that this method currently causes a full index parse 1333 unconditionally (which is reasonably appropriate as it is a means for 1334 thunking many small indices into one larger one and still supplies 1335 iter_all_entries at the thunk layer). 1336 1337 :param keys: An iterable providing the key prefixes to be retrieved. 1338 Each key prefix takes the form of a tuple the length of a key, but 1339 with the last N elements 'None' rather than a regular bytestring. 1340 The first element cannot be 'None'. 1341 :return: An iterable as per iter_all_entries, but restricted to the 1342 keys with a matching prefix to those supplied. No additional keys 1343 will be returned, and every match that is in the index will be 1344 returned. 1345 """ 1346 keys = sorted(set(keys)) 1347 if not keys: 1348 return 1349 # Load if needed to check key lengths 1350 if self._key_count is None: 1351 self._get_root_node() 1352 # TODO: only access nodes that can satisfy the prefixes we are looking 1353 # for. For now, to meet API usage (as this function is not used by 1354 # current breezy) just suck the entire index and iterate in memory. 1355 nodes = {} 1356 if self.node_ref_lists: 1357 if self._key_length == 1: 1358 for _1, key, value, refs in self.iter_all_entries(): 1359 nodes[key] = value, refs 1360 else: 1361 nodes_by_key = {} 1362 for _1, key, value, refs in self.iter_all_entries(): 1363 key_value = key, value, refs 1364 # For a key of (foo, bar, baz) create 1365 # _nodes_by_key[foo][bar][baz] = key_value 1366 key_dict = nodes_by_key 1367 for subkey in key[:-1]: 1368 key_dict = key_dict.setdefault(subkey, {}) 1369 key_dict[key[-1]] = key_value 1370 else: 1371 if self._key_length == 1: 1372 for _1, key, value in self.iter_all_entries(): 1373 nodes[key] = value 1374 else: 1375 nodes_by_key = {} 1376 for _1, key, value in self.iter_all_entries(): 1377 key_value = key, value 1378 # For a key of (foo, bar, baz) create 1379 # _nodes_by_key[foo][bar][baz] = key_value 1380 key_dict = nodes_by_key 1381 for subkey in key[:-1]: 1382 key_dict = key_dict.setdefault(subkey, {}) 1383 key_dict[key[-1]] = key_value 1384 if self._key_length == 1: 1385 for key in keys: 1386 index._sanity_check_key(self, key) 1387 try: 1388 if self.node_ref_lists: 1389 value, node_refs = nodes[key] 1390 yield self, key, value, node_refs 1391 else: 1392 yield self, key, nodes[key] 1393 except KeyError: 1394 pass 1395 return 1396 for entry in index._iter_entries_prefix(self, nodes_by_key, keys): 1397 yield entry 1398 1399 def key_count(self): 1400 """Return an estimate of the number of keys in this index. 1401 1402 For BTreeGraphIndex the estimate is exact as it is contained in the 1403 header. 1404 """ 1405 if self._key_count is None: 1406 self._get_root_node() 1407 return self._key_count 1408 1409 def _compute_row_offsets(self): 1410 """Fill out the _row_offsets attribute based on _row_lengths.""" 1411 offsets = [] 1412 row_offset = 0 1413 for row in self._row_lengths: 1414 offsets.append(row_offset) 1415 row_offset += row 1416 offsets.append(row_offset) 1417 self._row_offsets = offsets 1418 1419 def _parse_header_from_bytes(self, bytes): 1420 """Parse the header from a region of bytes. 1421 1422 :param bytes: The data to parse. 1423 :return: An offset, data tuple such as readv yields, for the unparsed 1424 data. (which may be of length 0). 1425 """ 1426 signature = bytes[0:len(self._signature())] 1427 if not signature == self._signature(): 1428 raise index.BadIndexFormatSignature(self._name, BTreeGraphIndex) 1429 lines = bytes[len(self._signature()):].splitlines() 1430 options_line = lines[0] 1431 if not options_line.startswith(_OPTION_NODE_REFS): 1432 raise index.BadIndexOptions(self) 1433 try: 1434 self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):]) 1435 except ValueError: 1436 raise index.BadIndexOptions(self) 1437 options_line = lines[1] 1438 if not options_line.startswith(_OPTION_KEY_ELEMENTS): 1439 raise index.BadIndexOptions(self) 1440 try: 1441 self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):]) 1442 except ValueError: 1443 raise index.BadIndexOptions(self) 1444 options_line = lines[2] 1445 if not options_line.startswith(_OPTION_LEN): 1446 raise index.BadIndexOptions(self) 1447 try: 1448 self._key_count = int(options_line[len(_OPTION_LEN):]) 1449 except ValueError: 1450 raise index.BadIndexOptions(self) 1451 options_line = lines[3] 1452 if not options_line.startswith(_OPTION_ROW_LENGTHS): 1453 raise index.BadIndexOptions(self) 1454 try: 1455 self._row_lengths = [int(length) for length in 1456 options_line[len(_OPTION_ROW_LENGTHS):].split( 1457 b',') 1458 if length] 1459 except ValueError: 1460 raise index.BadIndexOptions(self) 1461 self._compute_row_offsets() 1462 1463 # calculate the bytes we have processed 1464 header_end = (len(signature) + sum(map(len, lines[0:4])) + 4) 1465 return header_end, bytes[header_end:] 1466 1467 def _read_nodes(self, nodes): 1468 """Read some nodes from disk into the LRU cache. 1469 1470 This performs a readv to get the node data into memory, and parses each 1471 node, then yields it to the caller. The nodes are requested in the 1472 supplied order. If possible doing sort() on the list before requesting 1473 a read may improve performance. 1474 1475 :param nodes: The nodes to read. 0 - first node, 1 - second node etc. 1476 :return: None 1477 """ 1478 # may be the byte string of the whole file 1479 bytes = None 1480 # list of (offset, length) regions of the file that should, evenually 1481 # be read in to data_ranges, either from 'bytes' or from the transport 1482 ranges = [] 1483 base_offset = self._base_offset 1484 for index in nodes: 1485 offset = (index * _PAGE_SIZE) 1486 size = _PAGE_SIZE 1487 if index == 0: 1488 # Root node - special case 1489 if self._size: 1490 size = min(_PAGE_SIZE, self._size) 1491 else: 1492 # The only case where we don't know the size, is for very 1493 # small indexes. So we read the whole thing 1494 bytes = self._transport.get_bytes(self._name) 1495 num_bytes = len(bytes) 1496 self._size = num_bytes - base_offset 1497 # the whole thing should be parsed out of 'bytes' 1498 ranges = [(start, min(_PAGE_SIZE, num_bytes - start)) 1499 for start in range( 1500 base_offset, num_bytes, _PAGE_SIZE)] 1501 break 1502 else: 1503 if offset > self._size: 1504 raise AssertionError('tried to read past the end' 1505 ' of the file %s > %s' 1506 % (offset, self._size)) 1507 size = min(size, self._size - offset) 1508 ranges.append((base_offset + offset, size)) 1509 if not ranges: 1510 return 1511 elif bytes is not None: 1512 # already have the whole file 1513 data_ranges = [(start, bytes[start:start + size]) 1514 for start, size in ranges] 1515 elif self._file is None: 1516 data_ranges = self._transport.readv(self._name, ranges) 1517 else: 1518 data_ranges = [] 1519 for offset, size in ranges: 1520 self._file.seek(offset) 1521 data_ranges.append((offset, self._file.read(size))) 1522 for offset, data in data_ranges: 1523 offset -= base_offset 1524 if offset == 0: 1525 # extract the header 1526 offset, data = self._parse_header_from_bytes(data) 1527 if len(data) == 0: 1528 continue 1529 bytes = zlib.decompress(data) 1530 if bytes.startswith(_LEAF_FLAG): 1531 node = self._leaf_factory(bytes, self._key_length, 1532 self.node_ref_lists) 1533 elif bytes.startswith(_INTERNAL_FLAG): 1534 node = _InternalNode(bytes) 1535 else: 1536 raise AssertionError("Unknown node type for %r" % bytes) 1537 yield offset // _PAGE_SIZE, node 1538 1539 def _signature(self): 1540 """The file signature for this index type.""" 1541 return _BTSIGNATURE 1542 1543 def validate(self): 1544 """Validate that everything in the index can be accessed.""" 1545 # just read and parse every node. 1546 self._get_root_node() 1547 if len(self._row_lengths) > 1: 1548 start_node = self._row_offsets[1] 1549 else: 1550 # We shouldn't be reading anything anyway 1551 start_node = 1 1552 node_end = self._row_offsets[-1] 1553 for node in self._read_nodes(list(range(start_node, node_end))): 1554 pass 1555 1556 1557_gcchk_factory = _LeafNode 1558 1559try: 1560 from . import _btree_serializer_pyx as _btree_serializer 1561 _gcchk_factory = _btree_serializer._parse_into_chk 1562except ImportError as e: 1563 osutils.failed_to_load_extension(e) 1564 from . import _btree_serializer_py as _btree_serializer 1565