1# Copyright (C) 2007-2011 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Indexing facilities.""" 18 19__all__ = [ 20 'CombinedGraphIndex', 21 'GraphIndex', 22 'GraphIndexBuilder', 23 'GraphIndexPrefixAdapter', 24 'InMemoryGraphIndex', 25 ] 26 27from bisect import bisect_right 28from io import BytesIO 29import re 30 31from ..lazy_import import lazy_import 32lazy_import(globals(), """ 33from breezy import ( 34 bisect_multi, 35 revision as _mod_revision, 36 trace, 37 ) 38""") 39from .. import ( 40 debug, 41 errors, 42 ) 43from ..static_tuple import StaticTuple 44 45_HEADER_READV = (0, 200) 46_OPTION_KEY_ELEMENTS = b"key_elements=" 47_OPTION_LEN = b"len=" 48_OPTION_NODE_REFS = b"node_ref_lists=" 49_SIGNATURE = b"Bazaar Graph Index 1\n" 50 51 52class BadIndexFormatSignature(errors.BzrError): 53 54 _fmt = "%(value)s is not an index of type %(_type)s." 55 56 def __init__(self, value, _type): 57 errors.BzrError.__init__(self) 58 self.value = value 59 self._type = _type 60 61 62class BadIndexData(errors.BzrError): 63 64 _fmt = "Error in data for index %(value)s." 65 66 def __init__(self, value): 67 errors.BzrError.__init__(self) 68 self.value = value 69 70 71class BadIndexDuplicateKey(errors.BzrError): 72 73 _fmt = "The key '%(key)s' is already in index '%(index)s'." 74 75 def __init__(self, key, index): 76 errors.BzrError.__init__(self) 77 self.key = key 78 self.index = index 79 80 81class BadIndexKey(errors.BzrError): 82 83 _fmt = "The key '%(key)s' is not a valid key." 84 85 def __init__(self, key): 86 errors.BzrError.__init__(self) 87 self.key = key 88 89 90class BadIndexOptions(errors.BzrError): 91 92 _fmt = "Could not parse options for index %(value)s." 93 94 def __init__(self, value): 95 errors.BzrError.__init__(self) 96 self.value = value 97 98 99class BadIndexValue(errors.BzrError): 100 101 _fmt = "The value '%(value)s' is not a valid value." 102 103 def __init__(self, value): 104 errors.BzrError.__init__(self) 105 self.value = value 106 107 108_whitespace_re = re.compile(b'[\t\n\x0b\x0c\r\x00 ]') 109_newline_null_re = re.compile(b'[\n\0]') 110 111 112def _has_key_from_parent_map(self, key): 113 """Check if this index has one key. 114 115 If it's possible to check for multiple keys at once through 116 calling get_parent_map that should be faster. 117 """ 118 return (key in self.get_parent_map([key])) 119 120 121def _missing_keys_from_parent_map(self, keys): 122 return set(keys) - set(self.get_parent_map(keys)) 123 124 125class GraphIndexBuilder(object): 126 """A builder that can build a GraphIndex. 127 128 The resulting graph has the structure:: 129 130 _SIGNATURE OPTIONS NODES NEWLINE 131 _SIGNATURE := 'Bazaar Graph Index 1' NEWLINE 132 OPTIONS := 'node_ref_lists=' DIGITS NEWLINE 133 NODES := NODE* 134 NODE := KEY NULL ABSENT? NULL REFERENCES NULL VALUE NEWLINE 135 KEY := Not-whitespace-utf8 136 ABSENT := 'a' 137 REFERENCES := REFERENCE_LIST (TAB REFERENCE_LIST){node_ref_lists - 1} 138 REFERENCE_LIST := (REFERENCE (CR REFERENCE)*)? 139 REFERENCE := DIGITS ; digits is the byte offset in the index of the 140 ; referenced key. 141 VALUE := no-newline-no-null-bytes 142 """ 143 144 def __init__(self, reference_lists=0, key_elements=1): 145 """Create a GraphIndex builder. 146 147 :param reference_lists: The number of node references lists for each 148 entry. 149 :param key_elements: The number of bytestrings in each key. 150 """ 151 self.reference_lists = reference_lists 152 # A dict of {key: (absent, ref_lists, value)} 153 self._nodes = {} 154 # Keys that are referenced but not actually present in this index 155 self._absent_keys = set() 156 self._nodes_by_key = None 157 self._key_length = key_elements 158 self._optimize_for_size = False 159 self._combine_backing_indices = True 160 161 def _check_key(self, key): 162 """Raise BadIndexKey if key is not a valid key for this index.""" 163 if type(key) not in (tuple, StaticTuple): 164 raise BadIndexKey(key) 165 if self._key_length != len(key): 166 raise BadIndexKey(key) 167 for element in key: 168 if not element or type(element) != bytes or _whitespace_re.search(element) is not None: 169 raise BadIndexKey(key) 170 171 def _external_references(self): 172 """Return references that are not present in this index. 173 """ 174 keys = set() 175 refs = set() 176 # TODO: JAM 2008-11-21 This makes an assumption about how the reference 177 # lists are used. It is currently correct for pack-0.92 through 178 # 1.9, which use the node references (3rd column) second 179 # reference list as the compression parent. Perhaps this should 180 # be moved into something higher up the stack, since it 181 # makes assumptions about how the index is used. 182 if self.reference_lists > 1: 183 for node in self.iter_all_entries(): 184 keys.add(node[1]) 185 refs.update(node[3][1]) 186 return refs - keys 187 else: 188 # If reference_lists == 0 there can be no external references, and 189 # if reference_lists == 1, then there isn't a place to store the 190 # compression parent 191 return set() 192 193 def _get_nodes_by_key(self): 194 if self._nodes_by_key is None: 195 nodes_by_key = {} 196 if self.reference_lists: 197 for key, (absent, references, value) in self._nodes.items(): 198 if absent: 199 continue 200 key_dict = nodes_by_key 201 for subkey in key[:-1]: 202 key_dict = key_dict.setdefault(subkey, {}) 203 key_dict[key[-1]] = key, value, references 204 else: 205 for key, (absent, references, value) in self._nodes.items(): 206 if absent: 207 continue 208 key_dict = nodes_by_key 209 for subkey in key[:-1]: 210 key_dict = key_dict.setdefault(subkey, {}) 211 key_dict[key[-1]] = key, value 212 self._nodes_by_key = nodes_by_key 213 return self._nodes_by_key 214 215 def _update_nodes_by_key(self, key, value, node_refs): 216 """Update the _nodes_by_key dict with a new key. 217 218 For a key of (foo, bar, baz) create 219 _nodes_by_key[foo][bar][baz] = key_value 220 """ 221 if self._nodes_by_key is None: 222 return 223 key_dict = self._nodes_by_key 224 if self.reference_lists: 225 key_value = StaticTuple(key, value, node_refs) 226 else: 227 key_value = StaticTuple(key, value) 228 for subkey in key[:-1]: 229 key_dict = key_dict.setdefault(subkey, {}) 230 key_dict[key[-1]] = key_value 231 232 def _check_key_ref_value(self, key, references, value): 233 """Check that 'key' and 'references' are all valid. 234 235 :param key: A key tuple. Must conform to the key interface (be a tuple, 236 be of the right length, not have any whitespace or nulls in any key 237 element.) 238 :param references: An iterable of reference lists. Something like 239 [[(ref, key)], [(ref, key), (other, key)]] 240 :param value: The value associate with this key. Must not contain 241 newlines or null characters. 242 :return: (node_refs, absent_references) 243 244 * node_refs: basically a packed form of 'references' where all 245 iterables are tuples 246 * absent_references: reference keys that are not in self._nodes. 247 This may contain duplicates if the same key is referenced in 248 multiple lists. 249 """ 250 as_st = StaticTuple.from_sequence 251 self._check_key(key) 252 if _newline_null_re.search(value) is not None: 253 raise BadIndexValue(value) 254 if len(references) != self.reference_lists: 255 raise BadIndexValue(references) 256 node_refs = [] 257 absent_references = [] 258 for reference_list in references: 259 for reference in reference_list: 260 # If reference *is* in self._nodes, then we know it has already 261 # been checked. 262 if reference not in self._nodes: 263 self._check_key(reference) 264 absent_references.append(reference) 265 reference_list = as_st([as_st(ref).intern() 266 for ref in reference_list]) 267 node_refs.append(reference_list) 268 return as_st(node_refs), absent_references 269 270 def add_node(self, key, value, references=()): 271 """Add a node to the index. 272 273 :param key: The key. keys are non-empty tuples containing 274 as many whitespace-free utf8 bytestrings as the key length 275 defined for this index. 276 :param references: An iterable of iterables of keys. Each is a 277 reference to another key. 278 :param value: The value to associate with the key. It may be any 279 bytes as long as it does not contain \\0 or \\n. 280 """ 281 (node_refs, 282 absent_references) = self._check_key_ref_value(key, references, value) 283 if key in self._nodes and self._nodes[key][0] != b'a': 284 raise BadIndexDuplicateKey(key, self) 285 for reference in absent_references: 286 # There may be duplicates, but I don't think it is worth worrying 287 # about 288 self._nodes[reference] = (b'a', (), b'') 289 self._absent_keys.update(absent_references) 290 self._absent_keys.discard(key) 291 self._nodes[key] = (b'', node_refs, value) 292 if self._nodes_by_key is not None and self._key_length > 1: 293 self._update_nodes_by_key(key, value, node_refs) 294 295 def clear_cache(self): 296 """See GraphIndex.clear_cache() 297 298 This is a no-op, but we need the api to conform to a generic 'Index' 299 abstraction. 300 """ 301 302 def finish(self): 303 """Finish the index. 304 305 :returns: cBytesIO holding the full context of the index as it 306 should be written to disk. 307 """ 308 lines = [_SIGNATURE] 309 lines.append(b'%s%d\n' % (_OPTION_NODE_REFS, self.reference_lists)) 310 lines.append(b'%s%d\n' % (_OPTION_KEY_ELEMENTS, self._key_length)) 311 key_count = len(self._nodes) - len(self._absent_keys) 312 lines.append(b'%s%d\n' % (_OPTION_LEN, key_count)) 313 prefix_length = sum(len(x) for x in lines) 314 # references are byte offsets. To avoid having to do nasty 315 # polynomial work to resolve offsets (references to later in the 316 # file cannot be determined until all the inbetween references have 317 # been calculated too) we pad the offsets with 0's to make them be 318 # of consistent length. Using binary offsets would break the trivial 319 # file parsing. 320 # to calculate the width of zero's needed we do three passes: 321 # one to gather all the non-reference data and the number of references. 322 # one to pad all the data with reference-length and determine entry 323 # addresses. 324 # One to serialise. 325 326 # forward sorted by key. In future we may consider topological sorting, 327 # at the cost of table scans for direct lookup, or a second index for 328 # direct lookup 329 nodes = sorted(self._nodes.items()) 330 # if we do not prepass, we don't know how long it will be up front. 331 expected_bytes = None 332 # we only need to pre-pass if we have reference lists at all. 333 if self.reference_lists: 334 key_offset_info = [] 335 non_ref_bytes = prefix_length 336 total_references = 0 337 # TODO use simple multiplication for the constants in this loop. 338 for key, (absent, references, value) in nodes: 339 # record the offset known *so far* for this key: 340 # the non reference bytes to date, and the total references to 341 # date - saves reaccumulating on the second pass 342 key_offset_info.append((key, non_ref_bytes, total_references)) 343 # key is literal, value is literal, there are 3 null's, 1 NL 344 # key is variable length tuple, \x00 between elements 345 non_ref_bytes += sum(len(element) for element in key) 346 if self._key_length > 1: 347 non_ref_bytes += self._key_length - 1 348 # value is literal bytes, there are 3 null's, 1 NL. 349 non_ref_bytes += len(value) + 3 + 1 350 # one byte for absent if set. 351 if absent: 352 non_ref_bytes += 1 353 elif self.reference_lists: 354 # (ref_lists -1) tabs 355 non_ref_bytes += self.reference_lists - 1 356 # (ref-1 cr's per ref_list) 357 for ref_list in references: 358 # how many references across the whole file? 359 total_references += len(ref_list) 360 # accrue reference separators 361 if ref_list: 362 non_ref_bytes += len(ref_list) - 1 363 # how many digits are needed to represent the total byte count? 364 digits = 1 365 possible_total_bytes = non_ref_bytes + total_references * digits 366 while 10 ** digits < possible_total_bytes: 367 digits += 1 368 possible_total_bytes = non_ref_bytes + total_references * digits 369 expected_bytes = possible_total_bytes + 1 # terminating newline 370 # resolve key addresses. 371 key_addresses = {} 372 for key, non_ref_bytes, total_references in key_offset_info: 373 key_addresses[key] = non_ref_bytes + total_references * digits 374 # serialise 375 format_string = b'%%0%dd' % digits 376 for key, (absent, references, value) in nodes: 377 flattened_references = [] 378 for ref_list in references: 379 ref_addresses = [] 380 for reference in ref_list: 381 ref_addresses.append(format_string % 382 key_addresses[reference]) 383 flattened_references.append(b'\r'.join(ref_addresses)) 384 string_key = b'\x00'.join(key) 385 lines.append(b"%s\x00%s\x00%s\x00%s\n" % (string_key, absent, 386 b'\t'.join(flattened_references), value)) 387 lines.append(b'\n') 388 result = BytesIO(b''.join(lines)) 389 if expected_bytes and len(result.getvalue()) != expected_bytes: 390 raise errors.BzrError('Failed index creation. Internal error:' 391 ' mismatched output length and expected length: %d %d' % 392 (len(result.getvalue()), expected_bytes)) 393 return result 394 395 def set_optimize(self, for_size=None, combine_backing_indices=None): 396 """Change how the builder tries to optimize the result. 397 398 :param for_size: Tell the builder to try and make the index as small as 399 possible. 400 :param combine_backing_indices: If the builder spills to disk to save 401 memory, should the on-disk indices be combined. Set to True if you 402 are going to be probing the index, but to False if you are not. (If 403 you are not querying, then the time spent combining is wasted.) 404 :return: None 405 """ 406 # GraphIndexBuilder itself doesn't pay attention to the flag yet, but 407 # other builders do. 408 if for_size is not None: 409 self._optimize_for_size = for_size 410 if combine_backing_indices is not None: 411 self._combine_backing_indices = combine_backing_indices 412 413 def find_ancestry(self, keys, ref_list_num): 414 """See CombinedGraphIndex.find_ancestry()""" 415 pending = set(keys) 416 parent_map = {} 417 missing_keys = set() 418 while pending: 419 next_pending = set() 420 for _, key, value, ref_lists in self.iter_entries(pending): 421 parent_keys = ref_lists[ref_list_num] 422 parent_map[key] = parent_keys 423 next_pending.update([p for p in parent_keys if p not in 424 parent_map]) 425 missing_keys.update(pending.difference(parent_map)) 426 pending = next_pending 427 return parent_map, missing_keys 428 429 430class GraphIndex(object): 431 """An index for data with embedded graphs. 432 433 The index maps keys to a list of key reference lists, and a value. 434 Each node has the same number of key reference lists. Each key reference 435 list can be empty or an arbitrary length. The value is an opaque NULL 436 terminated string without any newlines. The storage of the index is 437 hidden in the interface: keys and key references are always tuples of 438 bytestrings, never the internal representation (e.g. dictionary offsets). 439 440 It is presumed that the index will not be mutated - it is static data. 441 442 Successive iter_all_entries calls will read the entire index each time. 443 Additionally, iter_entries calls will read the index linearly until the 444 desired keys are found. XXX: This must be fixed before the index is 445 suitable for production use. :XXX 446 """ 447 448 def __init__(self, transport, name, size, unlimited_cache=False, offset=0): 449 """Open an index called name on transport. 450 451 :param transport: A breezy.transport.Transport. 452 :param name: A path to provide to transport API calls. 453 :param size: The size of the index in bytes. This is used for bisection 454 logic to perform partial index reads. While the size could be 455 obtained by statting the file this introduced an additional round 456 trip as well as requiring stat'able transports, both of which are 457 avoided by having it supplied. If size is None, then bisection 458 support will be disabled and accessing the index will just stream 459 all the data. 460 :param offset: Instead of starting the index data at offset 0, start it 461 at an arbitrary offset. 462 """ 463 self._transport = transport 464 self._name = name 465 # Becomes a dict of key:(value, reference-list-byte-locations) used by 466 # the bisection interface to store parsed but not resolved keys. 467 self._bisect_nodes = None 468 # Becomes a dict of key:(value, reference-list-keys) which are ready to 469 # be returned directly to callers. 470 self._nodes = None 471 # a sorted list of slice-addresses for the parsed bytes of the file. 472 # e.g. (0,1) would mean that byte 0 is parsed. 473 self._parsed_byte_map = [] 474 # a sorted list of keys matching each slice address for parsed bytes 475 # e.g. (None, 'foo@bar') would mean that the first byte contained no 476 # key, and the end byte of the slice is the of the data for 'foo@bar' 477 self._parsed_key_map = [] 478 self._key_count = None 479 self._keys_by_offset = None 480 self._nodes_by_key = None 481 self._size = size 482 # The number of bytes we've read so far in trying to process this file 483 self._bytes_read = 0 484 self._base_offset = offset 485 486 def __eq__(self, other): 487 """Equal when self and other were created with the same parameters.""" 488 return ( 489 isinstance(self, type(other)) and 490 self._transport == other._transport and 491 self._name == other._name and 492 self._size == other._size) 493 494 def __ne__(self, other): 495 return not self.__eq__(other) 496 497 def __lt__(self, other): 498 # We don't really care about the order, just that there is an order. 499 if (not isinstance(other, GraphIndex) and 500 not isinstance(other, InMemoryGraphIndex)): 501 raise TypeError(other) 502 return hash(self) < hash(other) 503 504 def __hash__(self): 505 return hash((type(self), self._transport, self._name, self._size)) 506 507 def __repr__(self): 508 return "%s(%r)" % (self.__class__.__name__, 509 self._transport.abspath(self._name)) 510 511 def _buffer_all(self, stream=None): 512 """Buffer all the index data. 513 514 Mutates self._nodes and self.keys_by_offset. 515 """ 516 if self._nodes is not None: 517 # We already did this 518 return 519 if 'index' in debug.debug_flags: 520 trace.mutter('Reading entire index %s', 521 self._transport.abspath(self._name)) 522 if stream is None: 523 stream = self._transport.get(self._name) 524 if self._base_offset != 0: 525 # This is wasteful, but it is better than dealing with 526 # adjusting all the offsets, etc. 527 stream = BytesIO(stream.read()[self._base_offset:]) 528 try: 529 self._read_prefix(stream) 530 self._expected_elements = 3 + self._key_length 531 line_count = 0 532 # raw data keyed by offset 533 self._keys_by_offset = {} 534 # ready-to-return key:value or key:value, node_ref_lists 535 self._nodes = {} 536 self._nodes_by_key = None 537 trailers = 0 538 pos = stream.tell() 539 lines = stream.read().split(b'\n') 540 finally: 541 stream.close() 542 del lines[-1] 543 _, _, _, trailers = self._parse_lines(lines, pos) 544 for key, absent, references, value in self._keys_by_offset.values(): 545 if absent: 546 continue 547 # resolve references: 548 if self.node_ref_lists: 549 node_value = (value, self._resolve_references(references)) 550 else: 551 node_value = value 552 self._nodes[key] = node_value 553 # cache the keys for quick set intersections 554 if trailers != 1: 555 # there must be one line - the empty trailer line. 556 raise BadIndexData(self) 557 558 def clear_cache(self): 559 """Clear out any cached/memoized values. 560 561 This can be called at any time, but generally it is used when we have 562 extracted some information, but don't expect to be requesting any more 563 from this index. 564 """ 565 566 def external_references(self, ref_list_num): 567 """Return references that are not present in this index. 568 """ 569 self._buffer_all() 570 if ref_list_num + 1 > self.node_ref_lists: 571 raise ValueError('No ref list %d, index has %d ref lists' 572 % (ref_list_num, self.node_ref_lists)) 573 refs = set() 574 nodes = self._nodes 575 for key, (value, ref_lists) in nodes.items(): 576 ref_list = ref_lists[ref_list_num] 577 refs.update([ref for ref in ref_list if ref not in nodes]) 578 return refs 579 580 def _get_nodes_by_key(self): 581 if self._nodes_by_key is None: 582 nodes_by_key = {} 583 if self.node_ref_lists: 584 for key, (value, references) in self._nodes.items(): 585 key_dict = nodes_by_key 586 for subkey in key[:-1]: 587 key_dict = key_dict.setdefault(subkey, {}) 588 key_dict[key[-1]] = key, value, references 589 else: 590 for key, value in self._nodes.items(): 591 key_dict = nodes_by_key 592 for subkey in key[:-1]: 593 key_dict = key_dict.setdefault(subkey, {}) 594 key_dict[key[-1]] = key, value 595 self._nodes_by_key = nodes_by_key 596 return self._nodes_by_key 597 598 def iter_all_entries(self): 599 """Iterate over all keys within the index. 600 601 :return: An iterable of (index, key, value) or (index, key, value, reference_lists). 602 The former tuple is used when there are no reference lists in the 603 index, making the API compatible with simple key:value index types. 604 There is no defined order for the result iteration - it will be in 605 the most efficient order for the index. 606 """ 607 if 'evil' in debug.debug_flags: 608 trace.mutter_callsite(3, 609 "iter_all_entries scales with size of history.") 610 if self._nodes is None: 611 self._buffer_all() 612 if self.node_ref_lists: 613 for key, (value, node_ref_lists) in self._nodes.items(): 614 yield self, key, value, node_ref_lists 615 else: 616 for key, value in self._nodes.items(): 617 yield self, key, value 618 619 def _read_prefix(self, stream): 620 signature = stream.read(len(self._signature())) 621 if not signature == self._signature(): 622 raise BadIndexFormatSignature(self._name, GraphIndex) 623 options_line = stream.readline() 624 if not options_line.startswith(_OPTION_NODE_REFS): 625 raise BadIndexOptions(self) 626 try: 627 self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):-1]) 628 except ValueError: 629 raise BadIndexOptions(self) 630 options_line = stream.readline() 631 if not options_line.startswith(_OPTION_KEY_ELEMENTS): 632 raise BadIndexOptions(self) 633 try: 634 self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):-1]) 635 except ValueError: 636 raise BadIndexOptions(self) 637 options_line = stream.readline() 638 if not options_line.startswith(_OPTION_LEN): 639 raise BadIndexOptions(self) 640 try: 641 self._key_count = int(options_line[len(_OPTION_LEN):-1]) 642 except ValueError: 643 raise BadIndexOptions(self) 644 645 def _resolve_references(self, references): 646 """Return the resolved key references for references. 647 648 References are resolved by looking up the location of the key in the 649 _keys_by_offset map and substituting the key name, preserving ordering. 650 651 :param references: An iterable of iterables of key locations. e.g. 652 [[123, 456], [123]] 653 :return: A tuple of tuples of keys. 654 """ 655 node_refs = [] 656 for ref_list in references: 657 node_refs.append( 658 tuple([self._keys_by_offset[ref][0] for ref in ref_list])) 659 return tuple(node_refs) 660 661 @staticmethod 662 def _find_index(range_map, key): 663 """Helper for the _parsed_*_index calls. 664 665 Given a range map - [(start, end), ...], finds the index of the range 666 in the map for key if it is in the map, and if it is not there, the 667 immediately preceeding range in the map. 668 """ 669 result = bisect_right(range_map, key) - 1 670 if result + 1 < len(range_map): 671 # check the border condition, it may be in result + 1 672 if range_map[result + 1][0] == key[0]: 673 return result + 1 674 return result 675 676 def _parsed_byte_index(self, offset): 677 """Return the index of the entry immediately before offset. 678 679 e.g. if the parsed map has regions 0,10 and 11,12 parsed, meaning that 680 there is one unparsed byte (the 11th, addressed as[10]). then: 681 asking for 0 will return 0 682 asking for 10 will return 0 683 asking for 11 will return 1 684 asking for 12 will return 1 685 """ 686 key = (offset, 0) 687 return self._find_index(self._parsed_byte_map, key) 688 689 def _parsed_key_index(self, key): 690 """Return the index of the entry immediately before key. 691 692 e.g. if the parsed map has regions (None, 'a') and ('b','c') parsed, 693 meaning that keys from None to 'a' inclusive, and 'b' to 'c' inclusive 694 have been parsed, then: 695 asking for '' will return 0 696 asking for 'a' will return 0 697 asking for 'b' will return 1 698 asking for 'e' will return 1 699 """ 700 search_key = (key, b'') 701 return self._find_index(self._parsed_key_map, search_key) 702 703 def _is_parsed(self, offset): 704 """Returns True if offset has been parsed.""" 705 index = self._parsed_byte_index(offset) 706 if index == len(self._parsed_byte_map): 707 return offset < self._parsed_byte_map[index - 1][1] 708 start, end = self._parsed_byte_map[index] 709 return offset >= start and offset < end 710 711 def _iter_entries_from_total_buffer(self, keys): 712 """Iterate over keys when the entire index is parsed.""" 713 # Note: See the note in BTreeBuilder.iter_entries for why we don't use 714 # .intersection() here 715 nodes = self._nodes 716 keys = [key for key in keys if key in nodes] 717 if self.node_ref_lists: 718 for key in keys: 719 value, node_refs = nodes[key] 720 yield self, key, value, node_refs 721 else: 722 for key in keys: 723 yield self, key, nodes[key] 724 725 def iter_entries(self, keys): 726 """Iterate over keys within the index. 727 728 :param keys: An iterable providing the keys to be retrieved. 729 :return: An iterable as per iter_all_entries, but restricted to the 730 keys supplied. No additional keys will be returned, and every 731 key supplied that is in the index will be returned. 732 """ 733 keys = set(keys) 734 if not keys: 735 return [] 736 if self._size is None and self._nodes is None: 737 self._buffer_all() 738 739 # We fit about 20 keys per minimum-read (4K), so if we are looking for 740 # more than 1/20th of the index its likely (assuming homogenous key 741 # spread) that we'll read the entire index. If we're going to do that, 742 # buffer the whole thing. A better analysis might take key spread into 743 # account - but B+Tree indices are better anyway. 744 # We could look at all data read, and use a threshold there, which will 745 # trigger on ancestry walks, but that is not yet fully mapped out. 746 if self._nodes is None and len(keys) * 20 > self.key_count(): 747 self._buffer_all() 748 if self._nodes is not None: 749 return self._iter_entries_from_total_buffer(keys) 750 else: 751 return (result[1] for result in bisect_multi.bisect_multi_bytes( 752 self._lookup_keys_via_location, self._size, keys)) 753 754 def iter_entries_prefix(self, keys): 755 """Iterate over keys within the index using prefix matching. 756 757 Prefix matching is applied within the tuple of a key, not to within 758 the bytestring of each key element. e.g. if you have the keys ('foo', 759 'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then 760 only the former key is returned. 761 762 WARNING: Note that this method currently causes a full index parse 763 unconditionally (which is reasonably appropriate as it is a means for 764 thunking many small indices into one larger one and still supplies 765 iter_all_entries at the thunk layer). 766 767 :param keys: An iterable providing the key prefixes to be retrieved. 768 Each key prefix takes the form of a tuple the length of a key, but 769 with the last N elements 'None' rather than a regular bytestring. 770 The first element cannot be 'None'. 771 :return: An iterable as per iter_all_entries, but restricted to the 772 keys with a matching prefix to those supplied. No additional keys 773 will be returned, and every match that is in the index will be 774 returned. 775 """ 776 keys = set(keys) 777 if not keys: 778 return 779 # load data - also finds key lengths 780 if self._nodes is None: 781 self._buffer_all() 782 if self._key_length == 1: 783 for key in keys: 784 _sanity_check_key(self, key) 785 if self.node_ref_lists: 786 value, node_refs = self._nodes[key] 787 yield self, key, value, node_refs 788 else: 789 yield self, key, self._nodes[key] 790 return 791 nodes_by_key = self._get_nodes_by_key() 792 for entry in _iter_entries_prefix(self, nodes_by_key, keys): 793 yield entry 794 795 def _find_ancestors(self, keys, ref_list_num, parent_map, missing_keys): 796 """See BTreeIndex._find_ancestors.""" 797 # The api can be implemented as a trivial overlay on top of 798 # iter_entries, it is not an efficient implementation, but it at least 799 # gets the job done. 800 found_keys = set() 801 search_keys = set() 802 for index, key, value, refs in self.iter_entries(keys): 803 parent_keys = refs[ref_list_num] 804 found_keys.add(key) 805 parent_map[key] = parent_keys 806 search_keys.update(parent_keys) 807 # Figure out what, if anything, was missing 808 missing_keys.update(set(keys).difference(found_keys)) 809 search_keys = search_keys.difference(parent_map) 810 return search_keys 811 812 def key_count(self): 813 """Return an estimate of the number of keys in this index. 814 815 For GraphIndex the estimate is exact. 816 """ 817 if self._key_count is None: 818 self._read_and_parse([_HEADER_READV]) 819 return self._key_count 820 821 def _lookup_keys_via_location(self, location_keys): 822 """Public interface for implementing bisection. 823 824 If _buffer_all has been called, then all the data for the index is in 825 memory, and this method should not be called, as it uses a separate 826 cache because it cannot pre-resolve all indices, which buffer_all does 827 for performance. 828 829 :param location_keys: A list of location(byte offset), key tuples. 830 :return: A list of (location_key, result) tuples as expected by 831 breezy.bisect_multi.bisect_multi_bytes. 832 """ 833 # Possible improvements: 834 # - only bisect lookup each key once 835 # - sort the keys first, and use that to reduce the bisection window 836 # ----- 837 # this progresses in three parts: 838 # read data 839 # parse it 840 # attempt to answer the question from the now in memory data. 841 # build the readv request 842 # for each location, ask for 800 bytes - much more than rows we've seen 843 # anywhere. 844 readv_ranges = [] 845 for location, key in location_keys: 846 # can we answer from cache? 847 if self._bisect_nodes and key in self._bisect_nodes: 848 # We have the key parsed. 849 continue 850 index = self._parsed_key_index(key) 851 if (len(self._parsed_key_map) and 852 self._parsed_key_map[index][0] <= key and 853 (self._parsed_key_map[index][1] >= key or 854 # end of the file has been parsed 855 self._parsed_byte_map[index][1] == self._size)): 856 # the key has been parsed, so no lookup is needed even if its 857 # not present. 858 continue 859 # - if we have examined this part of the file already - yes 860 index = self._parsed_byte_index(location) 861 if (len(self._parsed_byte_map) and 862 self._parsed_byte_map[index][0] <= location and 863 self._parsed_byte_map[index][1] > location): 864 # the byte region has been parsed, so no read is needed. 865 continue 866 length = 800 867 if location + length > self._size: 868 length = self._size - location 869 # todo, trim out parsed locations. 870 if length > 0: 871 readv_ranges.append((location, length)) 872 # read the header if needed 873 if self._bisect_nodes is None: 874 readv_ranges.append(_HEADER_READV) 875 self._read_and_parse(readv_ranges) 876 result = [] 877 if self._nodes is not None: 878 # _read_and_parse triggered a _buffer_all because we requested the 879 # whole data range 880 for location, key in location_keys: 881 if key not in self._nodes: # not present 882 result.append(((location, key), False)) 883 elif self.node_ref_lists: 884 value, refs = self._nodes[key] 885 result.append(((location, key), 886 (self, key, value, refs))) 887 else: 888 result.append(((location, key), 889 (self, key, self._nodes[key]))) 890 return result 891 # generate results: 892 # - figure out <, >, missing, present 893 # - result present references so we can return them. 894 # keys that we cannot answer until we resolve references 895 pending_references = [] 896 pending_locations = set() 897 for location, key in location_keys: 898 # can we answer from cache? 899 if key in self._bisect_nodes: 900 # the key has been parsed, so no lookup is needed 901 if self.node_ref_lists: 902 # the references may not have been all parsed. 903 value, refs = self._bisect_nodes[key] 904 wanted_locations = [] 905 for ref_list in refs: 906 for ref in ref_list: 907 if ref not in self._keys_by_offset: 908 wanted_locations.append(ref) 909 if wanted_locations: 910 pending_locations.update(wanted_locations) 911 pending_references.append((location, key)) 912 continue 913 result.append(((location, key), (self, key, 914 value, self._resolve_references(refs)))) 915 else: 916 result.append(((location, key), 917 (self, key, self._bisect_nodes[key]))) 918 continue 919 else: 920 # has the region the key should be in, been parsed? 921 index = self._parsed_key_index(key) 922 if (self._parsed_key_map[index][0] <= key and 923 (self._parsed_key_map[index][1] >= key or 924 # end of the file has been parsed 925 self._parsed_byte_map[index][1] == self._size)): 926 result.append(((location, key), False)) 927 continue 928 # no, is the key above or below the probed location: 929 # get the range of the probed & parsed location 930 index = self._parsed_byte_index(location) 931 # if the key is below the start of the range, its below 932 if key < self._parsed_key_map[index][0]: 933 direction = -1 934 else: 935 direction = +1 936 result.append(((location, key), direction)) 937 readv_ranges = [] 938 # lookup data to resolve references 939 for location in pending_locations: 940 length = 800 941 if location + length > self._size: 942 length = self._size - location 943 # TODO: trim out parsed locations (e.g. if the 800 is into the 944 # parsed region trim it, and dont use the adjust_for_latency 945 # facility) 946 if length > 0: 947 readv_ranges.append((location, length)) 948 self._read_and_parse(readv_ranges) 949 if self._nodes is not None: 950 # The _read_and_parse triggered a _buffer_all, grab the data and 951 # return it 952 for location, key in pending_references: 953 value, refs = self._nodes[key] 954 result.append(((location, key), (self, key, value, refs))) 955 return result 956 for location, key in pending_references: 957 # answer key references we had to look-up-late. 958 value, refs = self._bisect_nodes[key] 959 result.append(((location, key), (self, key, 960 value, self._resolve_references(refs)))) 961 return result 962 963 def _parse_header_from_bytes(self, bytes): 964 """Parse the header from a region of bytes. 965 966 :param bytes: The data to parse. 967 :return: An offset, data tuple such as readv yields, for the unparsed 968 data. (which may length 0). 969 """ 970 signature = bytes[0:len(self._signature())] 971 if not signature == self._signature(): 972 raise BadIndexFormatSignature(self._name, GraphIndex) 973 lines = bytes[len(self._signature()):].splitlines() 974 options_line = lines[0] 975 if not options_line.startswith(_OPTION_NODE_REFS): 976 raise BadIndexOptions(self) 977 try: 978 self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):]) 979 except ValueError: 980 raise BadIndexOptions(self) 981 options_line = lines[1] 982 if not options_line.startswith(_OPTION_KEY_ELEMENTS): 983 raise BadIndexOptions(self) 984 try: 985 self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):]) 986 except ValueError: 987 raise BadIndexOptions(self) 988 options_line = lines[2] 989 if not options_line.startswith(_OPTION_LEN): 990 raise BadIndexOptions(self) 991 try: 992 self._key_count = int(options_line[len(_OPTION_LEN):]) 993 except ValueError: 994 raise BadIndexOptions(self) 995 # calculate the bytes we have processed 996 header_end = (len(signature) + len(lines[0]) + len(lines[1]) + 997 len(lines[2]) + 3) 998 self._parsed_bytes(0, (), header_end, ()) 999 # setup parsing state 1000 self._expected_elements = 3 + self._key_length 1001 # raw data keyed by offset 1002 self._keys_by_offset = {} 1003 # keys with the value and node references 1004 self._bisect_nodes = {} 1005 return header_end, bytes[header_end:] 1006 1007 def _parse_region(self, offset, data): 1008 """Parse node data returned from a readv operation. 1009 1010 :param offset: The byte offset the data starts at. 1011 :param data: The data to parse. 1012 """ 1013 # trim the data. 1014 # end first: 1015 end = offset + len(data) 1016 high_parsed = offset 1017 while True: 1018 # Trivial test - if the current index's end is within the 1019 # low-matching parsed range, we're done. 1020 index = self._parsed_byte_index(high_parsed) 1021 if end < self._parsed_byte_map[index][1]: 1022 return 1023 # print "[%d:%d]" % (offset, end), \ 1024 # self._parsed_byte_map[index:index + 2] 1025 high_parsed, last_segment = self._parse_segment( 1026 offset, data, end, index) 1027 if last_segment: 1028 return 1029 1030 def _parse_segment(self, offset, data, end, index): 1031 """Parse one segment of data. 1032 1033 :param offset: Where 'data' begins in the file. 1034 :param data: Some data to parse a segment of. 1035 :param end: Where data ends 1036 :param index: The current index into the parsed bytes map. 1037 :return: True if the parsed segment is the last possible one in the 1038 range of data. 1039 :return: high_parsed_byte, last_segment. 1040 high_parsed_byte is the location of the highest parsed byte in this 1041 segment, last_segment is True if the parsed segment is the last 1042 possible one in the data block. 1043 """ 1044 # default is to use all data 1045 trim_end = None 1046 # accomodate overlap with data before this. 1047 if offset < self._parsed_byte_map[index][1]: 1048 # overlaps the lower parsed region 1049 # skip the parsed data 1050 trim_start = self._parsed_byte_map[index][1] - offset 1051 # don't trim the start for \n 1052 start_adjacent = True 1053 elif offset == self._parsed_byte_map[index][1]: 1054 # abuts the lower parsed region 1055 # use all data 1056 trim_start = None 1057 # do not trim anything 1058 start_adjacent = True 1059 else: 1060 # does not overlap the lower parsed region 1061 # use all data 1062 trim_start = None 1063 # but trim the leading \n 1064 start_adjacent = False 1065 if end == self._size: 1066 # lines up to the end of all data: 1067 # use it all 1068 trim_end = None 1069 # do not strip to the last \n 1070 end_adjacent = True 1071 last_segment = True 1072 elif index + 1 == len(self._parsed_byte_map): 1073 # at the end of the parsed data 1074 # use it all 1075 trim_end = None 1076 # but strip to the last \n 1077 end_adjacent = False 1078 last_segment = True 1079 elif end == self._parsed_byte_map[index + 1][0]: 1080 # buts up against the next parsed region 1081 # use it all 1082 trim_end = None 1083 # do not strip to the last \n 1084 end_adjacent = True 1085 last_segment = True 1086 elif end > self._parsed_byte_map[index + 1][0]: 1087 # overlaps into the next parsed region 1088 # only consider the unparsed data 1089 trim_end = self._parsed_byte_map[index + 1][0] - offset 1090 # do not strip to the last \n as we know its an entire record 1091 end_adjacent = True 1092 last_segment = end < self._parsed_byte_map[index + 1][1] 1093 else: 1094 # does not overlap into the next region 1095 # use it all 1096 trim_end = None 1097 # but strip to the last \n 1098 end_adjacent = False 1099 last_segment = True 1100 # now find bytes to discard if needed 1101 if not start_adjacent: 1102 # work around python bug in rfind 1103 if trim_start is None: 1104 trim_start = data.find(b'\n') + 1 1105 else: 1106 trim_start = data.find(b'\n', trim_start) + 1 1107 if not (trim_start != 0): 1108 raise AssertionError('no \n was present') 1109 # print 'removing start', offset, trim_start, repr(data[:trim_start]) 1110 if not end_adjacent: 1111 # work around python bug in rfind 1112 if trim_end is None: 1113 trim_end = data.rfind(b'\n') + 1 1114 else: 1115 trim_end = data.rfind(b'\n', None, trim_end) + 1 1116 if not (trim_end != 0): 1117 raise AssertionError('no \n was present') 1118 # print 'removing end', offset, trim_end, repr(data[trim_end:]) 1119 # adjust offset and data to the parseable data. 1120 trimmed_data = data[trim_start:trim_end] 1121 if not (trimmed_data): 1122 raise AssertionError('read unneeded data [%d:%d] from [%d:%d]' 1123 % (trim_start, trim_end, offset, offset + len(data))) 1124 if trim_start: 1125 offset += trim_start 1126 # print "parsing", repr(trimmed_data) 1127 # splitlines mangles the \r delimiters.. don't use it. 1128 lines = trimmed_data.split(b'\n') 1129 del lines[-1] 1130 pos = offset 1131 first_key, last_key, nodes, _ = self._parse_lines(lines, pos) 1132 for key, value in nodes: 1133 self._bisect_nodes[key] = value 1134 self._parsed_bytes(offset, first_key, 1135 offset + len(trimmed_data), last_key) 1136 return offset + len(trimmed_data), last_segment 1137 1138 def _parse_lines(self, lines, pos): 1139 key = None 1140 first_key = None 1141 trailers = 0 1142 nodes = [] 1143 for line in lines: 1144 if line == b'': 1145 # must be at the end 1146 if self._size: 1147 if not (self._size == pos + 1): 1148 raise AssertionError("%s %s" % (self._size, pos)) 1149 trailers += 1 1150 continue 1151 elements = line.split(b'\0') 1152 if len(elements) != self._expected_elements: 1153 raise BadIndexData(self) 1154 # keys are tuples. Each element is a string that may occur many 1155 # times, so we intern them to save space. AB, RC, 200807 1156 key = tuple([element for element in elements[:self._key_length]]) 1157 if first_key is None: 1158 first_key = key 1159 absent, references, value = elements[-3:] 1160 ref_lists = [] 1161 for ref_string in references.split(b'\t'): 1162 ref_lists.append(tuple([ 1163 int(ref) for ref in ref_string.split(b'\r') if ref 1164 ])) 1165 ref_lists = tuple(ref_lists) 1166 self._keys_by_offset[pos] = (key, absent, ref_lists, value) 1167 pos += len(line) + 1 # +1 for the \n 1168 if absent: 1169 continue 1170 if self.node_ref_lists: 1171 node_value = (value, ref_lists) 1172 else: 1173 node_value = value 1174 nodes.append((key, node_value)) 1175 # print "parsed ", key 1176 return first_key, key, nodes, trailers 1177 1178 def _parsed_bytes(self, start, start_key, end, end_key): 1179 """Mark the bytes from start to end as parsed. 1180 1181 Calling self._parsed_bytes(1,2) will mark one byte (the one at offset 1182 1) as parsed. 1183 1184 :param start: The start of the parsed region. 1185 :param end: The end of the parsed region. 1186 """ 1187 index = self._parsed_byte_index(start) 1188 new_value = (start, end) 1189 new_key = (start_key, end_key) 1190 if index == -1: 1191 # first range parsed is always the beginning. 1192 self._parsed_byte_map.insert(index, new_value) 1193 self._parsed_key_map.insert(index, new_key) 1194 return 1195 # four cases: 1196 # new region 1197 # extend lower region 1198 # extend higher region 1199 # combine two regions 1200 if (index + 1 < len(self._parsed_byte_map) and 1201 self._parsed_byte_map[index][1] == start and 1202 self._parsed_byte_map[index + 1][0] == end): 1203 # combine two regions 1204 self._parsed_byte_map[index] = (self._parsed_byte_map[index][0], 1205 self._parsed_byte_map[index + 1][1]) 1206 self._parsed_key_map[index] = (self._parsed_key_map[index][0], 1207 self._parsed_key_map[index + 1][1]) 1208 del self._parsed_byte_map[index + 1] 1209 del self._parsed_key_map[index + 1] 1210 elif self._parsed_byte_map[index][1] == start: 1211 # extend the lower entry 1212 self._parsed_byte_map[index] = ( 1213 self._parsed_byte_map[index][0], end) 1214 self._parsed_key_map[index] = ( 1215 self._parsed_key_map[index][0], end_key) 1216 elif (index + 1 < len(self._parsed_byte_map) and 1217 self._parsed_byte_map[index + 1][0] == end): 1218 # extend the higher entry 1219 self._parsed_byte_map[index + 1] = ( 1220 start, self._parsed_byte_map[index + 1][1]) 1221 self._parsed_key_map[index + 1] = ( 1222 start_key, self._parsed_key_map[index + 1][1]) 1223 else: 1224 # new entry 1225 self._parsed_byte_map.insert(index + 1, new_value) 1226 self._parsed_key_map.insert(index + 1, new_key) 1227 1228 def _read_and_parse(self, readv_ranges): 1229 """Read the ranges and parse the resulting data. 1230 1231 :param readv_ranges: A prepared readv range list. 1232 """ 1233 if not readv_ranges: 1234 return 1235 if self._nodes is None and self._bytes_read * 2 >= self._size: 1236 # We've already read more than 50% of the file and we are about to 1237 # request more data, just _buffer_all() and be done 1238 self._buffer_all() 1239 return 1240 1241 base_offset = self._base_offset 1242 if base_offset != 0: 1243 # Rewrite the ranges for the offset 1244 readv_ranges = [(start + base_offset, size) 1245 for start, size in readv_ranges] 1246 readv_data = self._transport.readv(self._name, readv_ranges, True, 1247 self._size + self._base_offset) 1248 # parse 1249 for offset, data in readv_data: 1250 offset -= base_offset 1251 self._bytes_read += len(data) 1252 if offset < 0: 1253 # transport.readv() expanded to extra data which isn't part of 1254 # this index 1255 data = data[-offset:] 1256 offset = 0 1257 if offset == 0 and len(data) == self._size: 1258 # We read the whole range, most likely because the 1259 # Transport upcast our readv ranges into one long request 1260 # for enough total data to grab the whole index. 1261 self._buffer_all(BytesIO(data)) 1262 return 1263 if self._bisect_nodes is None: 1264 # this must be the start 1265 if not (offset == 0): 1266 raise AssertionError() 1267 offset, data = self._parse_header_from_bytes(data) 1268 # print readv_ranges, "[%d:%d]" % (offset, offset + len(data)) 1269 self._parse_region(offset, data) 1270 1271 def _signature(self): 1272 """The file signature for this index type.""" 1273 return _SIGNATURE 1274 1275 def validate(self): 1276 """Validate that everything in the index can be accessed.""" 1277 # iter_all validates completely at the moment, so just do that. 1278 for node in self.iter_all_entries(): 1279 pass 1280 1281 1282class CombinedGraphIndex(object): 1283 """A GraphIndex made up from smaller GraphIndices. 1284 1285 The backing indices must implement GraphIndex, and are presumed to be 1286 static data. 1287 1288 Queries against the combined index will be made against the first index, 1289 and then the second and so on. The order of indices can thus influence 1290 performance significantly. For example, if one index is on local disk and a 1291 second on a remote server, the local disk index should be before the other 1292 in the index list. 1293 1294 Also, queries tend to need results from the same indices as previous 1295 queries. So the indices will be reordered after every query to put the 1296 indices that had the result(s) of that query first (while otherwise 1297 preserving the relative ordering). 1298 """ 1299 1300 def __init__(self, indices, reload_func=None): 1301 """Create a CombinedGraphIndex backed by indices. 1302 1303 :param indices: An ordered list of indices to query for data. 1304 :param reload_func: A function to call if we find we are missing an 1305 index. Should have the form reload_func() => True/False to indicate 1306 if reloading actually changed anything. 1307 """ 1308 self._indices = indices 1309 self._reload_func = reload_func 1310 # Sibling indices are other CombinedGraphIndex that we should call 1311 # _move_to_front_by_name on when we auto-reorder ourself. 1312 self._sibling_indices = [] 1313 # A list of names that corresponds to the instances in self._indices, 1314 # so _index_names[0] is always the name for _indices[0], etc. Sibling 1315 # indices must all use the same set of names as each other. 1316 self._index_names = [None] * len(self._indices) 1317 1318 def __repr__(self): 1319 return "%s(%s)" % ( 1320 self.__class__.__name__, 1321 ', '.join(map(repr, self._indices))) 1322 1323 def clear_cache(self): 1324 """See GraphIndex.clear_cache()""" 1325 for index in self._indices: 1326 index.clear_cache() 1327 1328 def get_parent_map(self, keys): 1329 """See graph.StackedParentsProvider.get_parent_map""" 1330 search_keys = set(keys) 1331 if _mod_revision.NULL_REVISION in search_keys: 1332 search_keys.discard(_mod_revision.NULL_REVISION) 1333 found_parents = {_mod_revision.NULL_REVISION: []} 1334 else: 1335 found_parents = {} 1336 for index, key, value, refs in self.iter_entries(search_keys): 1337 parents = refs[0] 1338 if not parents: 1339 parents = (_mod_revision.NULL_REVISION,) 1340 found_parents[key] = parents 1341 return found_parents 1342 1343 __contains__ = _has_key_from_parent_map 1344 1345 def insert_index(self, pos, index, name=None): 1346 """Insert a new index in the list of indices to query. 1347 1348 :param pos: The position to insert the index. 1349 :param index: The index to insert. 1350 :param name: a name for this index, e.g. a pack name. These names can 1351 be used to reflect index reorderings to related CombinedGraphIndex 1352 instances that use the same names. (see set_sibling_indices) 1353 """ 1354 self._indices.insert(pos, index) 1355 self._index_names.insert(pos, name) 1356 1357 def iter_all_entries(self): 1358 """Iterate over all keys within the index 1359 1360 Duplicate keys across child indices are presumed to have the same 1361 value and are only reported once. 1362 1363 :return: An iterable of (index, key, reference_lists, value). 1364 There is no defined order for the result iteration - it will be in 1365 the most efficient order for the index. 1366 """ 1367 seen_keys = set() 1368 while True: 1369 try: 1370 for index in self._indices: 1371 for node in index.iter_all_entries(): 1372 if node[1] not in seen_keys: 1373 yield node 1374 seen_keys.add(node[1]) 1375 return 1376 except errors.NoSuchFile as e: 1377 if not self._try_reload(e): 1378 raise 1379 1380 def iter_entries(self, keys): 1381 """Iterate over keys within the index. 1382 1383 Duplicate keys across child indices are presumed to have the same 1384 value and are only reported once. 1385 1386 :param keys: An iterable providing the keys to be retrieved. 1387 :return: An iterable of (index, key, reference_lists, value). There is 1388 no defined order for the result iteration - it will be in the most 1389 efficient order for the index. 1390 """ 1391 keys = set(keys) 1392 hit_indices = [] 1393 while True: 1394 try: 1395 for index in self._indices: 1396 if not keys: 1397 break 1398 index_hit = False 1399 for node in index.iter_entries(keys): 1400 keys.remove(node[1]) 1401 yield node 1402 index_hit = True 1403 if index_hit: 1404 hit_indices.append(index) 1405 break 1406 except errors.NoSuchFile as e: 1407 if not self._try_reload(e): 1408 raise 1409 self._move_to_front(hit_indices) 1410 1411 def iter_entries_prefix(self, keys): 1412 """Iterate over keys within the index using prefix matching. 1413 1414 Duplicate keys across child indices are presumed to have the same 1415 value and are only reported once. 1416 1417 Prefix matching is applied within the tuple of a key, not to within 1418 the bytestring of each key element. e.g. if you have the keys ('foo', 1419 'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then 1420 only the former key is returned. 1421 1422 :param keys: An iterable providing the key prefixes to be retrieved. 1423 Each key prefix takes the form of a tuple the length of a key, but 1424 with the last N elements 'None' rather than a regular bytestring. 1425 The first element cannot be 'None'. 1426 :return: An iterable as per iter_all_entries, but restricted to the 1427 keys with a matching prefix to those supplied. No additional keys 1428 will be returned, and every match that is in the index will be 1429 returned. 1430 """ 1431 keys = set(keys) 1432 if not keys: 1433 return 1434 seen_keys = set() 1435 hit_indices = [] 1436 while True: 1437 try: 1438 for index in self._indices: 1439 index_hit = False 1440 for node in index.iter_entries_prefix(keys): 1441 if node[1] in seen_keys: 1442 continue 1443 seen_keys.add(node[1]) 1444 yield node 1445 index_hit = True 1446 if index_hit: 1447 hit_indices.append(index) 1448 break 1449 except errors.NoSuchFile as e: 1450 if not self._try_reload(e): 1451 raise 1452 self._move_to_front(hit_indices) 1453 1454 def _move_to_front(self, hit_indices): 1455 """Rearrange self._indices so that hit_indices are first. 1456 1457 Order is maintained as much as possible, e.g. the first unhit index 1458 will be the first index in _indices after the hit_indices, and the 1459 hit_indices will be present in exactly the order they are passed to 1460 _move_to_front. 1461 1462 _move_to_front propagates to all objects in self._sibling_indices by 1463 calling _move_to_front_by_name. 1464 """ 1465 if self._indices[:len(hit_indices)] == hit_indices: 1466 # The 'hit_indices' are already at the front (and in the same 1467 # order), no need to re-order 1468 return 1469 hit_names = self._move_to_front_by_index(hit_indices) 1470 for sibling_idx in self._sibling_indices: 1471 sibling_idx._move_to_front_by_name(hit_names) 1472 1473 def _move_to_front_by_index(self, hit_indices): 1474 """Core logic for _move_to_front. 1475 1476 Returns a list of names corresponding to the hit_indices param. 1477 """ 1478 indices_info = zip(self._index_names, self._indices) 1479 if 'index' in debug.debug_flags: 1480 indices_info = list(indices_info) 1481 trace.mutter('CombinedGraphIndex reordering: currently %r, ' 1482 'promoting %r', indices_info, hit_indices) 1483 hit_names = [] 1484 unhit_names = [] 1485 new_hit_indices = [] 1486 unhit_indices = [] 1487 1488 for offset, (name, idx) in enumerate(indices_info): 1489 if idx in hit_indices: 1490 hit_names.append(name) 1491 new_hit_indices.append(idx) 1492 if len(new_hit_indices) == len(hit_indices): 1493 # We've found all of the hit entries, everything else is 1494 # unhit 1495 unhit_names.extend(self._index_names[offset + 1:]) 1496 unhit_indices.extend(self._indices[offset + 1:]) 1497 break 1498 else: 1499 unhit_names.append(name) 1500 unhit_indices.append(idx) 1501 1502 self._indices = new_hit_indices + unhit_indices 1503 self._index_names = hit_names + unhit_names 1504 if 'index' in debug.debug_flags: 1505 trace.mutter('CombinedGraphIndex reordered: %r', self._indices) 1506 return hit_names 1507 1508 def _move_to_front_by_name(self, hit_names): 1509 """Moves indices named by 'hit_names' to front of the search order, as 1510 described in _move_to_front. 1511 """ 1512 # Translate names to index instances, and then call 1513 # _move_to_front_by_index. 1514 indices_info = zip(self._index_names, self._indices) 1515 hit_indices = [] 1516 for name, idx in indices_info: 1517 if name in hit_names: 1518 hit_indices.append(idx) 1519 self._move_to_front_by_index(hit_indices) 1520 1521 def find_ancestry(self, keys, ref_list_num): 1522 """Find the complete ancestry for the given set of keys. 1523 1524 Note that this is a whole-ancestry request, so it should be used 1525 sparingly. 1526 1527 :param keys: An iterable of keys to look for 1528 :param ref_list_num: The reference list which references the parents 1529 we care about. 1530 :return: (parent_map, missing_keys) 1531 """ 1532 # XXX: make this call _move_to_front? 1533 missing_keys = set() 1534 parent_map = {} 1535 keys_to_lookup = set(keys) 1536 generation = 0 1537 while keys_to_lookup: 1538 # keys that *all* indexes claim are missing, stop searching them 1539 generation += 1 1540 all_index_missing = None 1541 # print 'gen\tidx\tsub\tn_keys\tn_pmap\tn_miss' 1542 # print '%4d\t\t\t%4d\t%5d\t%5d' % (generation, len(keys_to_lookup), 1543 # len(parent_map), 1544 # len(missing_keys)) 1545 for index_idx, index in enumerate(self._indices): 1546 # TODO: we should probably be doing something with 1547 # 'missing_keys' since we've already determined that 1548 # those revisions have not been found anywhere 1549 index_missing_keys = set() 1550 # Find all of the ancestry we can from this index 1551 # keep looking until the search_keys set is empty, which means 1552 # things we didn't find should be in index_missing_keys 1553 search_keys = keys_to_lookup 1554 sub_generation = 0 1555 # print ' \t%2d\t\t%4d\t%5d\t%5d' % ( 1556 # index_idx, len(search_keys), 1557 # len(parent_map), len(index_missing_keys)) 1558 while search_keys: 1559 sub_generation += 1 1560 # TODO: ref_list_num should really be a parameter, since 1561 # CombinedGraphIndex does not know what the ref lists 1562 # mean. 1563 search_keys = index._find_ancestors(search_keys, 1564 ref_list_num, parent_map, index_missing_keys) 1565 # print ' \t \t%2d\t%4d\t%5d\t%5d' % ( 1566 # sub_generation, len(search_keys), 1567 # len(parent_map), len(index_missing_keys)) 1568 # Now set whatever was missing to be searched in the next index 1569 keys_to_lookup = index_missing_keys 1570 if all_index_missing is None: 1571 all_index_missing = set(index_missing_keys) 1572 else: 1573 all_index_missing.intersection_update(index_missing_keys) 1574 if not keys_to_lookup: 1575 break 1576 if all_index_missing is None: 1577 # There were no indexes, so all search keys are 'missing' 1578 missing_keys.update(keys_to_lookup) 1579 keys_to_lookup = None 1580 else: 1581 missing_keys.update(all_index_missing) 1582 keys_to_lookup.difference_update(all_index_missing) 1583 return parent_map, missing_keys 1584 1585 def key_count(self): 1586 """Return an estimate of the number of keys in this index. 1587 1588 For CombinedGraphIndex this is approximated by the sum of the keys of 1589 the child indices. As child indices may have duplicate keys this can 1590 have a maximum error of the number of child indices * largest number of 1591 keys in any index. 1592 """ 1593 while True: 1594 try: 1595 return sum((index.key_count() for index in self._indices), 0) 1596 except errors.NoSuchFile as e: 1597 if not self._try_reload(e): 1598 raise 1599 1600 missing_keys = _missing_keys_from_parent_map 1601 1602 def _try_reload(self, error): 1603 """We just got a NoSuchFile exception. 1604 1605 Try to reload the indices, if it fails, just raise the current 1606 exception. 1607 """ 1608 if self._reload_func is None: 1609 return False 1610 trace.mutter( 1611 'Trying to reload after getting exception: %s', str(error)) 1612 if not self._reload_func(): 1613 # We tried to reload, but nothing changed, so we fail anyway 1614 trace.mutter('_reload_func indicated nothing has changed.' 1615 ' Raising original exception.') 1616 return False 1617 return True 1618 1619 def set_sibling_indices(self, sibling_combined_graph_indices): 1620 """Set the CombinedGraphIndex objects to reorder after reordering self. 1621 """ 1622 self._sibling_indices = sibling_combined_graph_indices 1623 1624 def validate(self): 1625 """Validate that everything in the index can be accessed.""" 1626 while True: 1627 try: 1628 for index in self._indices: 1629 index.validate() 1630 return 1631 except errors.NoSuchFile as e: 1632 if not self._try_reload(e): 1633 raise 1634 1635 1636class InMemoryGraphIndex(GraphIndexBuilder): 1637 """A GraphIndex which operates entirely out of memory and is mutable. 1638 1639 This is designed to allow the accumulation of GraphIndex entries during a 1640 single write operation, where the accumulated entries need to be immediately 1641 available - for example via a CombinedGraphIndex. 1642 """ 1643 1644 def add_nodes(self, nodes): 1645 """Add nodes to the index. 1646 1647 :param nodes: An iterable of (key, node_refs, value) entries to add. 1648 """ 1649 if self.reference_lists: 1650 for (key, value, node_refs) in nodes: 1651 self.add_node(key, value, node_refs) 1652 else: 1653 for (key, value) in nodes: 1654 self.add_node(key, value) 1655 1656 def iter_all_entries(self): 1657 """Iterate over all keys within the index 1658 1659 :return: An iterable of (index, key, reference_lists, value). There is no 1660 defined order for the result iteration - it will be in the most 1661 efficient order for the index (in this case dictionary hash order). 1662 """ 1663 if 'evil' in debug.debug_flags: 1664 trace.mutter_callsite(3, 1665 "iter_all_entries scales with size of history.") 1666 if self.reference_lists: 1667 for key, (absent, references, value) in self._nodes.items(): 1668 if not absent: 1669 yield self, key, value, references 1670 else: 1671 for key, (absent, references, value) in self._nodes.items(): 1672 if not absent: 1673 yield self, key, value 1674 1675 def iter_entries(self, keys): 1676 """Iterate over keys within the index. 1677 1678 :param keys: An iterable providing the keys to be retrieved. 1679 :return: An iterable of (index, key, value, reference_lists). There is no 1680 defined order for the result iteration - it will be in the most 1681 efficient order for the index (keys iteration order in this case). 1682 """ 1683 # Note: See BTreeBuilder.iter_entries for an explanation of why we 1684 # aren't using set().intersection() here 1685 nodes = self._nodes 1686 keys = [key for key in keys if key in nodes] 1687 if self.reference_lists: 1688 for key in keys: 1689 node = nodes[key] 1690 if not node[0]: 1691 yield self, key, node[2], node[1] 1692 else: 1693 for key in keys: 1694 node = nodes[key] 1695 if not node[0]: 1696 yield self, key, node[2] 1697 1698 def iter_entries_prefix(self, keys): 1699 """Iterate over keys within the index using prefix matching. 1700 1701 Prefix matching is applied within the tuple of a key, not to within 1702 the bytestring of each key element. e.g. if you have the keys ('foo', 1703 'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then 1704 only the former key is returned. 1705 1706 :param keys: An iterable providing the key prefixes to be retrieved. 1707 Each key prefix takes the form of a tuple the length of a key, but 1708 with the last N elements 'None' rather than a regular bytestring. 1709 The first element cannot be 'None'. 1710 :return: An iterable as per iter_all_entries, but restricted to the 1711 keys with a matching prefix to those supplied. No additional keys 1712 will be returned, and every match that is in the index will be 1713 returned. 1714 """ 1715 keys = set(keys) 1716 if not keys: 1717 return 1718 if self._key_length == 1: 1719 for key in keys: 1720 _sanity_check_key(self, key) 1721 node = self._nodes[key] 1722 if node[0]: 1723 continue 1724 if self.reference_lists: 1725 yield self, key, node[2], node[1] 1726 else: 1727 yield self, key, node[2] 1728 return 1729 nodes_by_key = self._get_nodes_by_key() 1730 for entry in _iter_entries_prefix(self, nodes_by_key, keys): 1731 yield entry 1732 1733 def key_count(self): 1734 """Return an estimate of the number of keys in this index. 1735 1736 For InMemoryGraphIndex the estimate is exact. 1737 """ 1738 return len(self._nodes) - len(self._absent_keys) 1739 1740 def validate(self): 1741 """In memory index's have no known corruption at the moment.""" 1742 1743 def __lt__(self, other): 1744 # We don't really care about the order, just that there is an order. 1745 if (not isinstance(other, GraphIndex) and 1746 not isinstance(other, InMemoryGraphIndex)): 1747 raise TypeError(other) 1748 return hash(self) < hash(other) 1749 1750 1751class GraphIndexPrefixAdapter(object): 1752 """An adapter between GraphIndex with different key lengths. 1753 1754 Queries against this will emit queries against the adapted Graph with the 1755 prefix added, queries for all items use iter_entries_prefix. The returned 1756 nodes will have their keys and node references adjusted to remove the 1757 prefix. Finally, an add_nodes_callback can be supplied - when called the 1758 nodes and references being added will have prefix prepended. 1759 """ 1760 1761 def __init__(self, adapted, prefix, missing_key_length, 1762 add_nodes_callback=None): 1763 """Construct an adapter against adapted with prefix.""" 1764 self.adapted = adapted 1765 self.prefix_key = prefix + (None,) * missing_key_length 1766 self.prefix = prefix 1767 self.prefix_len = len(prefix) 1768 self.add_nodes_callback = add_nodes_callback 1769 1770 def add_nodes(self, nodes): 1771 """Add nodes to the index. 1772 1773 :param nodes: An iterable of (key, node_refs, value) entries to add. 1774 """ 1775 # save nodes in case its an iterator 1776 nodes = tuple(nodes) 1777 translated_nodes = [] 1778 try: 1779 # Add prefix_key to each reference node_refs is a tuple of tuples, 1780 # so split it apart, and add prefix_key to the internal reference 1781 for (key, value, node_refs) in nodes: 1782 adjusted_references = ( 1783 tuple(tuple(self.prefix + ref_node for ref_node in ref_list) 1784 for ref_list in node_refs)) 1785 translated_nodes.append((self.prefix + key, value, 1786 adjusted_references)) 1787 except ValueError: 1788 # XXX: TODO add an explicit interface for getting the reference list 1789 # status, to handle this bit of user-friendliness in the API more 1790 # explicitly. 1791 for (key, value) in nodes: 1792 translated_nodes.append((self.prefix + key, value)) 1793 self.add_nodes_callback(translated_nodes) 1794 1795 def add_node(self, key, value, references=()): 1796 """Add a node to the index. 1797 1798 :param key: The key. keys are non-empty tuples containing 1799 as many whitespace-free utf8 bytestrings as the key length 1800 defined for this index. 1801 :param references: An iterable of iterables of keys. Each is a 1802 reference to another key. 1803 :param value: The value to associate with the key. It may be any 1804 bytes as long as it does not contain \0 or \n. 1805 """ 1806 self.add_nodes(((key, value, references), )) 1807 1808 def _strip_prefix(self, an_iter): 1809 """Strip prefix data from nodes and return it.""" 1810 for node in an_iter: 1811 # cross checks 1812 if node[1][:self.prefix_len] != self.prefix: 1813 raise BadIndexData(self) 1814 for ref_list in node[3]: 1815 for ref_node in ref_list: 1816 if ref_node[:self.prefix_len] != self.prefix: 1817 raise BadIndexData(self) 1818 yield node[0], node[1][self.prefix_len:], node[2], ( 1819 tuple(tuple(ref_node[self.prefix_len:] for ref_node in ref_list) 1820 for ref_list in node[3])) 1821 1822 def iter_all_entries(self): 1823 """Iterate over all keys within the index 1824 1825 iter_all_entries is implemented against the adapted index using 1826 iter_entries_prefix. 1827 1828 :return: An iterable of (index, key, reference_lists, value). There is no 1829 defined order for the result iteration - it will be in the most 1830 efficient order for the index (in this case dictionary hash order). 1831 """ 1832 return self._strip_prefix(self.adapted.iter_entries_prefix([self.prefix_key])) 1833 1834 def iter_entries(self, keys): 1835 """Iterate over keys within the index. 1836 1837 :param keys: An iterable providing the keys to be retrieved. 1838 :return: An iterable of (index, key, value, reference_lists). There is no 1839 defined order for the result iteration - it will be in the most 1840 efficient order for the index (keys iteration order in this case). 1841 """ 1842 return self._strip_prefix(self.adapted.iter_entries( 1843 self.prefix + key for key in keys)) 1844 1845 def iter_entries_prefix(self, keys): 1846 """Iterate over keys within the index using prefix matching. 1847 1848 Prefix matching is applied within the tuple of a key, not to within 1849 the bytestring of each key element. e.g. if you have the keys ('foo', 1850 'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then 1851 only the former key is returned. 1852 1853 :param keys: An iterable providing the key prefixes to be retrieved. 1854 Each key prefix takes the form of a tuple the length of a key, but 1855 with the last N elements 'None' rather than a regular bytestring. 1856 The first element cannot be 'None'. 1857 :return: An iterable as per iter_all_entries, but restricted to the 1858 keys with a matching prefix to those supplied. No additional keys 1859 will be returned, and every match that is in the index will be 1860 returned. 1861 """ 1862 return self._strip_prefix(self.adapted.iter_entries_prefix( 1863 self.prefix + key for key in keys)) 1864 1865 def key_count(self): 1866 """Return an estimate of the number of keys in this index. 1867 1868 For GraphIndexPrefixAdapter this is relatively expensive - key 1869 iteration with the prefix is done. 1870 """ 1871 return len(list(self.iter_all_entries())) 1872 1873 def validate(self): 1874 """Call the adapted's validate.""" 1875 self.adapted.validate() 1876 1877 1878def _sanity_check_key(index_or_builder, key): 1879 """Raise BadIndexKey if key cannot be used for prefix matching.""" 1880 if key[0] is None: 1881 raise BadIndexKey(key) 1882 if len(key) != index_or_builder._key_length: 1883 raise BadIndexKey(key) 1884 1885 1886def _iter_entries_prefix(index_or_builder, nodes_by_key, keys): 1887 """Helper for implementing prefix matching iterators.""" 1888 for key in keys: 1889 _sanity_check_key(index_or_builder, key) 1890 # find what it refers to: 1891 key_dict = nodes_by_key 1892 elements = list(key) 1893 # find the subdict whose contents should be returned. 1894 try: 1895 while len(elements) and elements[0] is not None: 1896 key_dict = key_dict[elements[0]] 1897 elements.pop(0) 1898 except KeyError: 1899 # a non-existant lookup. 1900 continue 1901 if len(elements): 1902 dicts = [key_dict] 1903 while dicts: 1904 values_view = dicts.pop().values() 1905 # can't be empty or would not exist 1906 value = next(iter(values_view)) 1907 if isinstance(value, dict): 1908 # still descending, push values 1909 dicts.extend(values_view) 1910 else: 1911 # at leaf tuples, yield values 1912 for value in values_view: 1913 # each value is the key:value:node refs tuple 1914 # ready to yield. 1915 yield (index_or_builder, ) + value 1916 else: 1917 # the last thing looked up was a terminal element 1918 yield (index_or_builder, ) + key_dict 1919