1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package table 8 9import ( 10 "encoding/binary" 11 "fmt" 12 "io" 13 "sort" 14 "strings" 15 "sync" 16 17 "github.com/golang/snappy" 18 19 "github.com/syndtr/goleveldb/leveldb/cache" 20 "github.com/syndtr/goleveldb/leveldb/comparer" 21 "github.com/syndtr/goleveldb/leveldb/errors" 22 "github.com/syndtr/goleveldb/leveldb/filter" 23 "github.com/syndtr/goleveldb/leveldb/iterator" 24 "github.com/syndtr/goleveldb/leveldb/opt" 25 "github.com/syndtr/goleveldb/leveldb/storage" 26 "github.com/syndtr/goleveldb/leveldb/util" 27) 28 29// Reader errors. 30var ( 31 ErrNotFound = errors.ErrNotFound 32 ErrReaderReleased = errors.New("leveldb/table: reader released") 33 ErrIterReleased = errors.New("leveldb/table: iterator released") 34) 35 36// ErrCorrupted describes error due to corruption. This error will be wrapped 37// with errors.ErrCorrupted. 38type ErrCorrupted struct { 39 Pos int64 40 Size int64 41 Kind string 42 Reason string 43} 44 45func (e *ErrCorrupted) Error() string { 46 return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason) 47} 48 49func max(x, y int) int { 50 if x > y { 51 return x 52 } 53 return y 54} 55 56type block struct { 57 bpool *util.BufferPool 58 bh blockHandle 59 data []byte 60 restartsLen int 61 restartsOffset int 62} 63 64func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { 65 index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { 66 offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) 67 offset++ // shared always zero, since this is a restart point 68 v1, n1 := binary.Uvarint(b.data[offset:]) // key length 69 _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length 70 m := offset + n1 + n2 71 return cmp.Compare(b.data[m:m+int(v1)], key) > 0 72 }) + rstart - 1 73 if index < rstart { 74 // The smallest key is greater-than key sought. 75 index = rstart 76 } 77 offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) 78 return 79} 80 81func (b *block) restartIndex(rstart, rlimit, offset int) int { 82 return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { 83 return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset 84 }) + rstart - 1 85} 86 87func (b *block) restartOffset(index int) int { 88 return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) 89} 90 91func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) { 92 if offset >= b.restartsOffset { 93 if offset != b.restartsOffset { 94 err = &ErrCorrupted{Reason: "entries offset not aligned"} 95 } 96 return 97 } 98 v0, n0 := binary.Uvarint(b.data[offset:]) // Shared prefix length 99 v1, n1 := binary.Uvarint(b.data[offset+n0:]) // Key length 100 v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length 101 m := n0 + n1 + n2 102 n = m + int(v1) + int(v2) 103 if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset { 104 err = &ErrCorrupted{Reason: "entries corrupted"} 105 return 106 } 107 key = b.data[offset+m : offset+m+int(v1)] 108 value = b.data[offset+m+int(v1) : offset+n] 109 nShared = int(v0) 110 return 111} 112 113func (b *block) Release() { 114 b.bpool.Put(b.data) 115 b.bpool = nil 116 b.data = nil 117} 118 119type dir int 120 121const ( 122 dirReleased dir = iota - 1 123 dirSOI 124 dirEOI 125 dirBackward 126 dirForward 127) 128 129type blockIter struct { 130 tr *Reader 131 block *block 132 blockReleaser util.Releaser 133 releaser util.Releaser 134 key, value []byte 135 offset int 136 // Previous offset, only filled by Next. 137 prevOffset int 138 prevNode []int 139 prevKeys []byte 140 restartIndex int 141 // Iterator direction. 142 dir dir 143 // Restart index slice range. 144 riStart int 145 riLimit int 146 // Offset slice range. 147 offsetStart int 148 offsetRealStart int 149 offsetLimit int 150 // Error. 151 err error 152} 153 154func (i *blockIter) sErr(err error) { 155 i.err = err 156 i.key = nil 157 i.value = nil 158 i.prevNode = nil 159 i.prevKeys = nil 160} 161 162func (i *blockIter) reset() { 163 if i.dir == dirBackward { 164 i.prevNode = i.prevNode[:0] 165 i.prevKeys = i.prevKeys[:0] 166 } 167 i.restartIndex = i.riStart 168 i.offset = i.offsetStart 169 i.dir = dirSOI 170 i.key = i.key[:0] 171 i.value = nil 172} 173 174func (i *blockIter) isFirst() bool { 175 switch i.dir { 176 case dirForward: 177 return i.prevOffset == i.offsetRealStart 178 case dirBackward: 179 return len(i.prevNode) == 1 && i.restartIndex == i.riStart 180 } 181 return false 182} 183 184func (i *blockIter) isLast() bool { 185 switch i.dir { 186 case dirForward, dirBackward: 187 return i.offset == i.offsetLimit 188 } 189 return false 190} 191 192func (i *blockIter) First() bool { 193 if i.err != nil { 194 return false 195 } else if i.dir == dirReleased { 196 i.err = ErrIterReleased 197 return false 198 } 199 200 if i.dir == dirBackward { 201 i.prevNode = i.prevNode[:0] 202 i.prevKeys = i.prevKeys[:0] 203 } 204 i.dir = dirSOI 205 return i.Next() 206} 207 208func (i *blockIter) Last() bool { 209 if i.err != nil { 210 return false 211 } else if i.dir == dirReleased { 212 i.err = ErrIterReleased 213 return false 214 } 215 216 if i.dir == dirBackward { 217 i.prevNode = i.prevNode[:0] 218 i.prevKeys = i.prevKeys[:0] 219 } 220 i.dir = dirEOI 221 return i.Prev() 222} 223 224func (i *blockIter) Seek(key []byte) bool { 225 if i.err != nil { 226 return false 227 } else if i.dir == dirReleased { 228 i.err = ErrIterReleased 229 return false 230 } 231 232 ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key) 233 if err != nil { 234 i.sErr(err) 235 return false 236 } 237 i.restartIndex = ri 238 i.offset = max(i.offsetStart, offset) 239 if i.dir == dirSOI || i.dir == dirEOI { 240 i.dir = dirForward 241 } 242 for i.Next() { 243 if i.tr.cmp.Compare(i.key, key) >= 0 { 244 return true 245 } 246 } 247 return false 248} 249 250func (i *blockIter) Next() bool { 251 if i.dir == dirEOI || i.err != nil { 252 return false 253 } else if i.dir == dirReleased { 254 i.err = ErrIterReleased 255 return false 256 } 257 258 if i.dir == dirSOI { 259 i.restartIndex = i.riStart 260 i.offset = i.offsetStart 261 } else if i.dir == dirBackward { 262 i.prevNode = i.prevNode[:0] 263 i.prevKeys = i.prevKeys[:0] 264 } 265 for i.offset < i.offsetRealStart { 266 key, value, nShared, n, err := i.block.entry(i.offset) 267 if err != nil { 268 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) 269 return false 270 } 271 if n == 0 { 272 i.dir = dirEOI 273 return false 274 } 275 i.key = append(i.key[:nShared], key...) 276 i.value = value 277 i.offset += n 278 } 279 if i.offset >= i.offsetLimit { 280 i.dir = dirEOI 281 if i.offset != i.offsetLimit { 282 i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned")) 283 } 284 return false 285 } 286 key, value, nShared, n, err := i.block.entry(i.offset) 287 if err != nil { 288 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) 289 return false 290 } 291 if n == 0 { 292 i.dir = dirEOI 293 return false 294 } 295 i.key = append(i.key[:nShared], key...) 296 i.value = value 297 i.prevOffset = i.offset 298 i.offset += n 299 i.dir = dirForward 300 return true 301} 302 303func (i *blockIter) Prev() bool { 304 if i.dir == dirSOI || i.err != nil { 305 return false 306 } else if i.dir == dirReleased { 307 i.err = ErrIterReleased 308 return false 309 } 310 311 var ri int 312 if i.dir == dirForward { 313 // Change direction. 314 i.offset = i.prevOffset 315 if i.offset == i.offsetRealStart { 316 i.dir = dirSOI 317 return false 318 } 319 ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset) 320 i.dir = dirBackward 321 } else if i.dir == dirEOI { 322 // At the end of iterator. 323 i.restartIndex = i.riLimit 324 i.offset = i.offsetLimit 325 if i.offset == i.offsetRealStart { 326 i.dir = dirSOI 327 return false 328 } 329 ri = i.riLimit - 1 330 i.dir = dirBackward 331 } else if len(i.prevNode) == 1 { 332 // This is the end of a restart range. 333 i.offset = i.prevNode[0] 334 i.prevNode = i.prevNode[:0] 335 if i.restartIndex == i.riStart { 336 i.dir = dirSOI 337 return false 338 } 339 i.restartIndex-- 340 ri = i.restartIndex 341 } else { 342 // In the middle of restart range, get from cache. 343 n := len(i.prevNode) - 3 344 node := i.prevNode[n:] 345 i.prevNode = i.prevNode[:n] 346 // Get the key. 347 ko := node[0] 348 i.key = append(i.key[:0], i.prevKeys[ko:]...) 349 i.prevKeys = i.prevKeys[:ko] 350 // Get the value. 351 vo := node[1] 352 vl := vo + node[2] 353 i.value = i.block.data[vo:vl] 354 i.offset = vl 355 return true 356 } 357 // Build entries cache. 358 i.key = i.key[:0] 359 i.value = nil 360 offset := i.block.restartOffset(ri) 361 if offset == i.offset { 362 ri-- 363 if ri < 0 { 364 i.dir = dirSOI 365 return false 366 } 367 offset = i.block.restartOffset(ri) 368 } 369 i.prevNode = append(i.prevNode, offset) 370 for { 371 key, value, nShared, n, err := i.block.entry(offset) 372 if err != nil { 373 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) 374 return false 375 } 376 if offset >= i.offsetRealStart { 377 if i.value != nil { 378 // Appends 3 variables: 379 // 1. Previous keys offset 380 // 2. Value offset in the data block 381 // 3. Value length 382 i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value)) 383 i.prevKeys = append(i.prevKeys, i.key...) 384 } 385 i.value = value 386 } 387 i.key = append(i.key[:nShared], key...) 388 offset += n 389 // Stop if target offset reached. 390 if offset >= i.offset { 391 if offset != i.offset { 392 i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned")) 393 return false 394 } 395 396 break 397 } 398 } 399 i.restartIndex = ri 400 i.offset = offset 401 return true 402} 403 404func (i *blockIter) Key() []byte { 405 if i.err != nil || i.dir <= dirEOI { 406 return nil 407 } 408 return i.key 409} 410 411func (i *blockIter) Value() []byte { 412 if i.err != nil || i.dir <= dirEOI { 413 return nil 414 } 415 return i.value 416} 417 418func (i *blockIter) Release() { 419 if i.dir != dirReleased { 420 i.tr = nil 421 i.block = nil 422 i.prevNode = nil 423 i.prevKeys = nil 424 i.key = nil 425 i.value = nil 426 i.dir = dirReleased 427 if i.blockReleaser != nil { 428 i.blockReleaser.Release() 429 i.blockReleaser = nil 430 } 431 if i.releaser != nil { 432 i.releaser.Release() 433 i.releaser = nil 434 } 435 } 436} 437 438func (i *blockIter) SetReleaser(releaser util.Releaser) { 439 if i.dir == dirReleased { 440 panic(util.ErrReleased) 441 } 442 if i.releaser != nil && releaser != nil { 443 panic(util.ErrHasReleaser) 444 } 445 i.releaser = releaser 446} 447 448func (i *blockIter) Valid() bool { 449 return i.err == nil && (i.dir == dirBackward || i.dir == dirForward) 450} 451 452func (i *blockIter) Error() error { 453 return i.err 454} 455 456type filterBlock struct { 457 bpool *util.BufferPool 458 data []byte 459 oOffset int 460 baseLg uint 461 filtersNum int 462} 463 464func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool { 465 i := int(offset >> b.baseLg) 466 if i < b.filtersNum { 467 o := b.data[b.oOffset+i*4:] 468 n := int(binary.LittleEndian.Uint32(o)) 469 m := int(binary.LittleEndian.Uint32(o[4:])) 470 if n < m && m <= b.oOffset { 471 return filter.Contains(b.data[n:m], key) 472 } else if n == m { 473 return false 474 } 475 } 476 return true 477} 478 479func (b *filterBlock) Release() { 480 b.bpool.Put(b.data) 481 b.bpool = nil 482 b.data = nil 483} 484 485type indexIter struct { 486 *blockIter 487 tr *Reader 488 slice *util.Range 489 // Options 490 fillCache bool 491} 492 493func (i *indexIter) Get() iterator.Iterator { 494 value := i.Value() 495 if value == nil { 496 return nil 497 } 498 dataBH, n := decodeBlockHandle(value) 499 if n == 0 { 500 return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle")) 501 } 502 503 var slice *util.Range 504 if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { 505 slice = i.slice 506 } 507 return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache) 508} 509 510// Reader is a table reader. 511type Reader struct { 512 mu sync.RWMutex 513 fd storage.FileDesc 514 reader io.ReaderAt 515 cache *cache.NamespaceGetter 516 err error 517 bpool *util.BufferPool 518 // Options 519 o *opt.Options 520 cmp comparer.Comparer 521 filter filter.Filter 522 verifyChecksum bool 523 524 dataEnd int64 525 metaBH, indexBH, filterBH blockHandle 526 indexBlock *block 527 filterBlock *filterBlock 528} 529 530func (r *Reader) blockKind(bh blockHandle) string { 531 switch bh.offset { 532 case r.metaBH.offset: 533 return "meta-block" 534 case r.indexBH.offset: 535 return "index-block" 536 case r.filterBH.offset: 537 if r.filterBH.length > 0 { 538 return "filter-block" 539 } 540 } 541 return "data-block" 542} 543 544func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error { 545 return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}} 546} 547 548func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error { 549 return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason) 550} 551 552func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error { 553 if cerr, ok := err.(*ErrCorrupted); ok { 554 cerr.Pos = int64(bh.offset) 555 cerr.Size = int64(bh.length) 556 cerr.Kind = r.blockKind(bh) 557 return &errors.ErrCorrupted{Fd: r.fd, Err: cerr} 558 } 559 return err 560} 561 562func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) { 563 data := r.bpool.Get(int(bh.length + blockTrailerLen)) 564 if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF { 565 return nil, err 566 } 567 568 if verifyChecksum { 569 n := bh.length + 1 570 checksum0 := binary.LittleEndian.Uint32(data[n:]) 571 checksum1 := util.NewCRC(data[:n]).Value() 572 if checksum0 != checksum1 { 573 r.bpool.Put(data) 574 return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1)) 575 } 576 } 577 578 switch data[bh.length] { 579 case blockTypeNoCompression: 580 data = data[:bh.length] 581 case blockTypeSnappyCompression: 582 decLen, err := snappy.DecodedLen(data[:bh.length]) 583 if err != nil { 584 r.bpool.Put(data) 585 return nil, r.newErrCorruptedBH(bh, err.Error()) 586 } 587 decData := r.bpool.Get(decLen) 588 decData, err = snappy.Decode(decData, data[:bh.length]) 589 r.bpool.Put(data) 590 if err != nil { 591 r.bpool.Put(decData) 592 return nil, r.newErrCorruptedBH(bh, err.Error()) 593 } 594 data = decData 595 default: 596 r.bpool.Put(data) 597 return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length])) 598 } 599 return data, nil 600} 601 602func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) { 603 data, err := r.readRawBlock(bh, verifyChecksum) 604 if err != nil { 605 return nil, err 606 } 607 restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) 608 b := &block{ 609 bpool: r.bpool, 610 bh: bh, 611 data: data, 612 restartsLen: restartsLen, 613 restartsOffset: len(data) - (restartsLen+1)*4, 614 } 615 return b, nil 616} 617 618func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) { 619 if r.cache != nil { 620 var ( 621 err error 622 ch *cache.Handle 623 ) 624 if fillCache { 625 ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) { 626 var b *block 627 b, err = r.readBlock(bh, verifyChecksum) 628 if err != nil { 629 return 0, nil 630 } 631 return cap(b.data), b 632 }) 633 } else { 634 ch = r.cache.Get(bh.offset, nil) 635 } 636 if ch != nil { 637 b, ok := ch.Value().(*block) 638 if !ok { 639 ch.Release() 640 return nil, nil, errors.New("leveldb/table: inconsistent block type") 641 } 642 return b, ch, err 643 } else if err != nil { 644 return nil, nil, err 645 } 646 } 647 648 b, err := r.readBlock(bh, verifyChecksum) 649 return b, b, err 650} 651 652func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) { 653 data, err := r.readRawBlock(bh, true) 654 if err != nil { 655 return nil, err 656 } 657 n := len(data) 658 if n < 5 { 659 return nil, r.newErrCorruptedBH(bh, "too short") 660 } 661 m := n - 5 662 oOffset := int(binary.LittleEndian.Uint32(data[m:])) 663 if oOffset > m { 664 return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset") 665 } 666 b := &filterBlock{ 667 bpool: r.bpool, 668 data: data, 669 oOffset: oOffset, 670 baseLg: uint(data[n-1]), 671 filtersNum: (m - oOffset) / 4, 672 } 673 return b, nil 674} 675 676func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) { 677 if r.cache != nil { 678 var ( 679 err error 680 ch *cache.Handle 681 ) 682 if fillCache { 683 ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) { 684 var b *filterBlock 685 b, err = r.readFilterBlock(bh) 686 if err != nil { 687 return 0, nil 688 } 689 return cap(b.data), b 690 }) 691 } else { 692 ch = r.cache.Get(bh.offset, nil) 693 } 694 if ch != nil { 695 b, ok := ch.Value().(*filterBlock) 696 if !ok { 697 ch.Release() 698 return nil, nil, errors.New("leveldb/table: inconsistent block type") 699 } 700 return b, ch, err 701 } else if err != nil { 702 return nil, nil, err 703 } 704 } 705 706 b, err := r.readFilterBlock(bh) 707 return b, b, err 708} 709 710func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) { 711 if r.indexBlock == nil { 712 return r.readBlockCached(r.indexBH, true, fillCache) 713 } 714 return r.indexBlock, util.NoopReleaser{}, nil 715} 716 717func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) { 718 if r.filterBlock == nil { 719 return r.readFilterBlockCached(r.filterBH, fillCache) 720 } 721 return r.filterBlock, util.NoopReleaser{}, nil 722} 723 724func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter { 725 bi := &blockIter{ 726 tr: r, 727 block: b, 728 blockReleaser: bReleaser, 729 // Valid key should never be nil. 730 key: make([]byte, 0), 731 dir: dirSOI, 732 riStart: 0, 733 riLimit: b.restartsLen, 734 offsetStart: 0, 735 offsetRealStart: 0, 736 offsetLimit: b.restartsOffset, 737 } 738 if slice != nil { 739 if slice.Start != nil { 740 if bi.Seek(slice.Start) { 741 bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset) 742 bi.offsetStart = b.restartOffset(bi.riStart) 743 bi.offsetRealStart = bi.prevOffset 744 } else { 745 bi.riStart = b.restartsLen 746 bi.offsetStart = b.restartsOffset 747 bi.offsetRealStart = b.restartsOffset 748 } 749 } 750 if slice.Limit != nil { 751 if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) { 752 bi.offsetLimit = bi.prevOffset 753 bi.riLimit = bi.restartIndex + 1 754 } 755 } 756 bi.reset() 757 if bi.offsetStart > bi.offsetLimit { 758 bi.sErr(errors.New("leveldb/table: invalid slice range")) 759 } 760 } 761 return bi 762} 763 764func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator { 765 b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache) 766 if err != nil { 767 return iterator.NewEmptyIterator(err) 768 } 769 return r.newBlockIter(b, rel, slice, false) 770} 771 772func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator { 773 r.mu.RLock() 774 defer r.mu.RUnlock() 775 776 if r.err != nil { 777 return iterator.NewEmptyIterator(r.err) 778 } 779 780 return r.getDataIter(dataBH, slice, verifyChecksum, fillCache) 781} 782 783// NewIterator creates an iterator from the table. 784// 785// Slice allows slicing the iterator to only contains keys in the given 786// range. A nil Range.Start is treated as a key before all keys in the 787// table. And a nil Range.Limit is treated as a key after all keys in 788// the table. 789// 790// WARNING: Any slice returned by interator (e.g. slice returned by calling 791// Iterator.Key() or Iterator.Key() methods), its content should not be modified 792// unless noted otherwise. 793// 794// The returned iterator is not safe for concurrent use and should be released 795// after use. 796// 797// Also read Iterator documentation of the leveldb/iterator package. 798func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 799 r.mu.RLock() 800 defer r.mu.RUnlock() 801 802 if r.err != nil { 803 return iterator.NewEmptyIterator(r.err) 804 } 805 806 fillCache := !ro.GetDontFillCache() 807 indexBlock, rel, err := r.getIndexBlock(fillCache) 808 if err != nil { 809 return iterator.NewEmptyIterator(err) 810 } 811 index := &indexIter{ 812 blockIter: r.newBlockIter(indexBlock, rel, slice, true), 813 tr: r, 814 slice: slice, 815 fillCache: !ro.GetDontFillCache(), 816 } 817 return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader)) 818} 819 820func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) { 821 r.mu.RLock() 822 defer r.mu.RUnlock() 823 824 if r.err != nil { 825 err = r.err 826 return 827 } 828 829 indexBlock, rel, err := r.getIndexBlock(true) 830 if err != nil { 831 return 832 } 833 defer rel.Release() 834 835 index := r.newBlockIter(indexBlock, nil, nil, true) 836 defer index.Release() 837 838 if !index.Seek(key) { 839 if err = index.Error(); err == nil { 840 err = ErrNotFound 841 } 842 return 843 } 844 845 dataBH, n := decodeBlockHandle(index.Value()) 846 if n == 0 { 847 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") 848 return nil, nil, r.err 849 } 850 851 // The filter should only used for exact match. 852 if filtered && r.filter != nil { 853 filterBlock, frel, ferr := r.getFilterBlock(true) 854 if ferr == nil { 855 if !filterBlock.contains(r.filter, dataBH.offset, key) { 856 frel.Release() 857 return nil, nil, ErrNotFound 858 } 859 frel.Release() 860 } else if !errors.IsCorrupted(ferr) { 861 return nil, nil, ferr 862 } 863 } 864 865 data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) 866 if !data.Seek(key) { 867 data.Release() 868 if err = data.Error(); err != nil { 869 return 870 } 871 872 // The nearest greater-than key is the first key of the next block. 873 if !index.Next() { 874 if err = index.Error(); err == nil { 875 err = ErrNotFound 876 } 877 return 878 } 879 880 dataBH, n = decodeBlockHandle(index.Value()) 881 if n == 0 { 882 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") 883 return nil, nil, r.err 884 } 885 886 data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) 887 if !data.Next() { 888 data.Release() 889 if err = data.Error(); err == nil { 890 err = ErrNotFound 891 } 892 return 893 } 894 } 895 896 // Key doesn't use block buffer, no need to copy the buffer. 897 rkey = data.Key() 898 if !noValue { 899 if r.bpool == nil { 900 value = data.Value() 901 } else { 902 // Value does use block buffer, and since the buffer will be 903 // recycled, it need to be copied. 904 value = append([]byte{}, data.Value()...) 905 } 906 } 907 data.Release() 908 return 909} 910 911// Find finds key/value pair whose key is greater than or equal to the 912// given key. It returns ErrNotFound if the table doesn't contain 913// such pair. 914// If filtered is true then the nearest 'block' will be checked against 915// 'filter data' (if present) and will immediately return ErrNotFound if 916// 'filter data' indicates that such pair doesn't exist. 917// 918// The caller may modify the contents of the returned slice as it is its 919// own copy. 920// It is safe to modify the contents of the argument after Find returns. 921func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) { 922 return r.find(key, filtered, ro, false) 923} 924 925// FindKey finds key that is greater than or equal to the given key. 926// It returns ErrNotFound if the table doesn't contain such key. 927// If filtered is true then the nearest 'block' will be checked against 928// 'filter data' (if present) and will immediately return ErrNotFound if 929// 'filter data' indicates that such key doesn't exist. 930// 931// The caller may modify the contents of the returned slice as it is its 932// own copy. 933// It is safe to modify the contents of the argument after Find returns. 934func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) { 935 rkey, _, err = r.find(key, filtered, ro, true) 936 return 937} 938 939// Get gets the value for the given key. It returns errors.ErrNotFound 940// if the table does not contain the key. 941// 942// The caller may modify the contents of the returned slice as it is its 943// own copy. 944// It is safe to modify the contents of the argument after Find returns. 945func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { 946 r.mu.RLock() 947 defer r.mu.RUnlock() 948 949 if r.err != nil { 950 err = r.err 951 return 952 } 953 954 rkey, value, err := r.find(key, false, ro, false) 955 if err == nil && r.cmp.Compare(rkey, key) != 0 { 956 value = nil 957 err = ErrNotFound 958 } 959 return 960} 961 962// OffsetOf returns approximate offset for the given key. 963// 964// It is safe to modify the contents of the argument after Get returns. 965func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { 966 r.mu.RLock() 967 defer r.mu.RUnlock() 968 969 if r.err != nil { 970 err = r.err 971 return 972 } 973 974 indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true) 975 if err != nil { 976 return 977 } 978 defer rel.Release() 979 980 index := r.newBlockIter(indexBlock, nil, nil, true) 981 defer index.Release() 982 if index.Seek(key) { 983 dataBH, n := decodeBlockHandle(index.Value()) 984 if n == 0 { 985 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") 986 return 987 } 988 offset = int64(dataBH.offset) 989 return 990 } 991 err = index.Error() 992 if err == nil { 993 offset = r.dataEnd 994 } 995 return 996} 997 998// Release implements util.Releaser. 999// It also close the file if it is an io.Closer. 1000func (r *Reader) Release() { 1001 r.mu.Lock() 1002 defer r.mu.Unlock() 1003 1004 if closer, ok := r.reader.(io.Closer); ok { 1005 closer.Close() 1006 } 1007 if r.indexBlock != nil { 1008 r.indexBlock.Release() 1009 r.indexBlock = nil 1010 } 1011 if r.filterBlock != nil { 1012 r.filterBlock.Release() 1013 r.filterBlock = nil 1014 } 1015 r.reader = nil 1016 r.cache = nil 1017 r.bpool = nil 1018 r.err = ErrReaderReleased 1019} 1020 1021// NewReader creates a new initialized table reader for the file. 1022// The fi, cache and bpool is optional and can be nil. 1023// 1024// The returned table reader instance is safe for concurrent use. 1025func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { 1026 if f == nil { 1027 return nil, errors.New("leveldb/table: nil file") 1028 } 1029 1030 r := &Reader{ 1031 fd: fd, 1032 reader: f, 1033 cache: cache, 1034 bpool: bpool, 1035 o: o, 1036 cmp: o.GetComparer(), 1037 verifyChecksum: o.GetStrict(opt.StrictBlockChecksum), 1038 } 1039 1040 if size < footerLen { 1041 r.err = r.newErrCorrupted(0, size, "table", "too small") 1042 return r, nil 1043 } 1044 1045 footerPos := size - footerLen 1046 var footer [footerLen]byte 1047 if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF { 1048 return nil, err 1049 } 1050 if string(footer[footerLen-len(magic):footerLen]) != magic { 1051 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number") 1052 return r, nil 1053 } 1054 1055 var n int 1056 // Decode the metaindex block handle. 1057 r.metaBH, n = decodeBlockHandle(footer[:]) 1058 if n == 0 { 1059 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle") 1060 return r, nil 1061 } 1062 1063 // Decode the index block handle. 1064 r.indexBH, n = decodeBlockHandle(footer[n:]) 1065 if n == 0 { 1066 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle") 1067 return r, nil 1068 } 1069 1070 // Read metaindex block. 1071 metaBlock, err := r.readBlock(r.metaBH, true) 1072 if err != nil { 1073 if errors.IsCorrupted(err) { 1074 r.err = err 1075 return r, nil 1076 } 1077 return nil, err 1078 } 1079 1080 // Set data end. 1081 r.dataEnd = int64(r.metaBH.offset) 1082 1083 // Read metaindex. 1084 metaIter := r.newBlockIter(metaBlock, nil, nil, true) 1085 for metaIter.Next() { 1086 key := string(metaIter.Key()) 1087 if !strings.HasPrefix(key, "filter.") { 1088 continue 1089 } 1090 fn := key[7:] 1091 if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn { 1092 r.filter = f0 1093 } else { 1094 for _, f0 := range o.GetAltFilters() { 1095 if f0.Name() == fn { 1096 r.filter = f0 1097 break 1098 } 1099 } 1100 } 1101 if r.filter != nil { 1102 filterBH, n := decodeBlockHandle(metaIter.Value()) 1103 if n == 0 { 1104 continue 1105 } 1106 r.filterBH = filterBH 1107 // Update data end. 1108 r.dataEnd = int64(filterBH.offset) 1109 break 1110 } 1111 } 1112 metaIter.Release() 1113 metaBlock.Release() 1114 1115 // Cache index and filter block locally, since we don't have global cache. 1116 if cache == nil { 1117 r.indexBlock, err = r.readBlock(r.indexBH, true) 1118 if err != nil { 1119 if errors.IsCorrupted(err) { 1120 r.err = err 1121 return r, nil 1122 } 1123 return nil, err 1124 } 1125 if r.filter != nil { 1126 r.filterBlock, err = r.readFilterBlock(r.filterBH) 1127 if err != nil { 1128 if !errors.IsCorrupted(err) { 1129 return nil, err 1130 } 1131 1132 // Don't use filter then. 1133 r.filter = nil 1134 } 1135 } 1136 } 1137 1138 return r, nil 1139} 1140