1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with this 4 * work for additional information regarding copyright ownership. The ASF 5 * licenses this file to you under the Apache License, Version 2.0 (the 6 * "License"); you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 * License for the specific language governing permissions and limitations 15 * under the License. 16 */ 17 package org.apache.hadoop.hbase.io.encoding; 18 19 import java.io.DataInputStream; 20 import java.io.DataOutputStream; 21 import java.io.IOException; 22 import java.nio.ByteBuffer; 23 24 import org.apache.hadoop.hbase.Cell; 25 import org.apache.hadoop.hbase.CellComparator; 26 import org.apache.hadoop.hbase.CellUtil; 27 import org.apache.hadoop.hbase.HConstants; 28 import org.apache.hadoop.hbase.KeyValue; 29 import org.apache.hadoop.hbase.KeyValue.KVComparator; 30 import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator; 31 import org.apache.hadoop.hbase.KeyValue.Type; 32 import org.apache.hadoop.hbase.KeyValueUtil; 33 import org.apache.hadoop.hbase.SettableSequenceId; 34 import org.apache.hadoop.hbase.classification.InterfaceAudience; 35 import org.apache.hadoop.hbase.io.HeapSize; 36 import org.apache.hadoop.hbase.io.TagCompressionContext; 37 import org.apache.hadoop.hbase.io.hfile.BlockType; 38 import org.apache.hadoop.hbase.io.hfile.HFileContext; 39 import org.apache.hadoop.hbase.io.util.LRUDictionary; 40 import org.apache.hadoop.hbase.util.ByteBufferUtils; 41 import org.apache.hadoop.hbase.util.Bytes; 42 import org.apache.hadoop.hbase.util.ClassSize; 43 import org.apache.hadoop.io.WritableUtils; 44 45 /** 46 * Base class for all data block encoders that use a buffer. 47 */ 48 @InterfaceAudience.Private 49 abstract class BufferedDataBlockEncoder implements DataBlockEncoder { 50 51 private static int INITIAL_KEY_BUFFER_SIZE = 512; 52 53 @Override decodeKeyValues(DataInputStream source, HFileBlockDecodingContext blkDecodingCtx)54 public ByteBuffer decodeKeyValues(DataInputStream source, 55 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 56 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 57 throw new IOException(this.getClass().getName() + " only accepts " 58 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 59 } 60 61 HFileBlockDefaultDecodingContext decodingCtx = 62 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 63 if (decodingCtx.getHFileContext().isIncludesTags() 64 && decodingCtx.getHFileContext().isCompressTags()) { 65 if (decodingCtx.getTagCompressionContext() != null) { 66 // It will be overhead to create the TagCompressionContext again and again for every block 67 // decoding. 68 decodingCtx.getTagCompressionContext().clear(); 69 } else { 70 try { 71 TagCompressionContext tagCompressionContext = new TagCompressionContext( 72 LRUDictionary.class, Byte.MAX_VALUE); 73 decodingCtx.setTagCompressionContext(tagCompressionContext); 74 } catch (Exception e) { 75 throw new IOException("Failed to initialize TagCompressionContext", e); 76 } 77 } 78 } 79 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 80 } 81 82 protected static class SeekerState implements Cell { 83 protected ByteBuffer currentBuffer; 84 protected TagCompressionContext tagCompressionContext; 85 protected int valueOffset = -1; 86 protected int keyLength; 87 protected int valueLength; 88 protected int lastCommonPrefix; 89 protected int tagsLength = 0; 90 protected int tagsOffset = -1; 91 protected int tagsCompressedLength = 0; 92 protected boolean uncompressTags = true; 93 94 /** We need to store a copy of the key. */ 95 protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE]; 96 protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE]; 97 98 protected long memstoreTS; 99 protected int nextKvOffset; 100 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 101 isValid()102 protected boolean isValid() { 103 return valueOffset != -1; 104 } 105 invalidate()106 protected void invalidate() { 107 valueOffset = -1; 108 tagsCompressedLength = 0; 109 currentKey = new KeyValue.KeyOnlyKeyValue(); 110 uncompressTags = true; 111 currentBuffer = null; 112 } 113 ensureSpaceForKey()114 protected void ensureSpaceForKey() { 115 if (keyLength > keyBuffer.length) { 116 // rare case, but we need to handle arbitrary length of key 117 int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2; 118 while (keyLength > newKeyBufferLength) { 119 newKeyBufferLength *= 2; 120 } 121 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 122 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 123 keyBuffer = newKeyBuffer; 124 } 125 } 126 ensureSpaceForTags()127 protected void ensureSpaceForTags() { 128 if (tagsLength > tagsBuffer.length) { 129 // rare case, but we need to handle arbitrary length of tags 130 int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2; 131 while (tagsLength > newTagsBufferLength) { 132 newTagsBufferLength *= 2; 133 } 134 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 135 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 136 tagsBuffer = newTagsBuffer; 137 } 138 } 139 setKey(byte[] keyBuffer, long memTS)140 protected void setKey(byte[] keyBuffer, long memTS) { 141 currentKey.setKey(keyBuffer, 0, keyLength); 142 memstoreTS = memTS; 143 } 144 145 /** 146 * Copy the state from the next one into this instance (the previous state 147 * placeholder). Used to save the previous state when we are advancing the 148 * seeker to the next key/value. 149 */ copyFromNext(SeekerState nextState)150 protected void copyFromNext(SeekerState nextState) { 151 if (keyBuffer.length != nextState.keyBuffer.length) { 152 keyBuffer = nextState.keyBuffer.clone(); 153 } else if (!isValid()) { 154 // Note: we can only call isValid before we override our state, so this 155 // comes before all the assignments at the end of this method. 156 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, 157 nextState.keyLength); 158 } else { 159 // don't copy the common prefix between this key and the previous one 160 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, 161 keyBuffer, nextState.lastCommonPrefix, nextState.keyLength 162 - nextState.lastCommonPrefix); 163 } 164 currentKey = nextState.currentKey; 165 166 valueOffset = nextState.valueOffset; 167 keyLength = nextState.keyLength; 168 valueLength = nextState.valueLength; 169 lastCommonPrefix = nextState.lastCommonPrefix; 170 nextKvOffset = nextState.nextKvOffset; 171 memstoreTS = nextState.memstoreTS; 172 currentBuffer = nextState.currentBuffer; 173 tagsOffset = nextState.tagsOffset; 174 tagsLength = nextState.tagsLength; 175 if (nextState.tagCompressionContext != null) { 176 tagCompressionContext = nextState.tagCompressionContext; 177 } 178 } 179 180 @Override getRowArray()181 public byte[] getRowArray() { 182 return currentKey.getRowArray(); 183 } 184 185 @Override getRowOffset()186 public int getRowOffset() { 187 return Bytes.SIZEOF_SHORT; 188 } 189 190 @Override getRowLength()191 public short getRowLength() { 192 return currentKey.getRowLength(); 193 } 194 195 @Override getFamilyArray()196 public byte[] getFamilyArray() { 197 return currentKey.getFamilyArray(); 198 } 199 200 @Override getFamilyOffset()201 public int getFamilyOffset() { 202 return currentKey.getFamilyOffset(); 203 } 204 205 @Override getFamilyLength()206 public byte getFamilyLength() { 207 return currentKey.getFamilyLength(); 208 } 209 210 @Override getQualifierArray()211 public byte[] getQualifierArray() { 212 return currentKey.getQualifierArray(); 213 } 214 215 @Override getQualifierOffset()216 public int getQualifierOffset() { 217 return currentKey.getQualifierOffset(); 218 } 219 220 @Override getQualifierLength()221 public int getQualifierLength() { 222 return currentKey.getQualifierLength(); 223 } 224 225 @Override getTimestamp()226 public long getTimestamp() { 227 return currentKey.getTimestamp(); 228 } 229 230 @Override getTypeByte()231 public byte getTypeByte() { 232 return currentKey.getTypeByte(); 233 } 234 235 @Override getMvccVersion()236 public long getMvccVersion() { 237 return memstoreTS; 238 } 239 240 @Override getSequenceId()241 public long getSequenceId() { 242 return memstoreTS; 243 } 244 245 @Override getValueArray()246 public byte[] getValueArray() { 247 return currentBuffer.array(); 248 } 249 250 @Override getValueOffset()251 public int getValueOffset() { 252 return currentBuffer.arrayOffset() + valueOffset; 253 } 254 255 @Override getValueLength()256 public int getValueLength() { 257 return valueLength; 258 } 259 260 @Override getTagsArray()261 public byte[] getTagsArray() { 262 if (tagCompressionContext != null) { 263 return tagsBuffer; 264 } 265 return currentBuffer.array(); 266 } 267 268 @Override getTagsOffset()269 public int getTagsOffset() { 270 if (tagCompressionContext != null) { 271 return 0; 272 } 273 return currentBuffer.arrayOffset() + tagsOffset; 274 } 275 276 @Override getTagsLength()277 public int getTagsLength() { 278 return tagsLength; 279 } 280 281 @Override 282 @Deprecated getValue()283 public byte[] getValue() { 284 throw new UnsupportedOperationException("getValue() not supported"); 285 } 286 287 @Override 288 @Deprecated getFamily()289 public byte[] getFamily() { 290 throw new UnsupportedOperationException("getFamily() not supported"); 291 } 292 293 @Override 294 @Deprecated getQualifier()295 public byte[] getQualifier() { 296 throw new UnsupportedOperationException("getQualifier() not supported"); 297 } 298 299 @Override 300 @Deprecated getRow()301 public byte[] getRow() { 302 throw new UnsupportedOperationException("getRow() not supported"); 303 } 304 305 @Override toString()306 public String toString() { 307 return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 308 + getValueLength() + "/seqid=" + memstoreTS; 309 } 310 shallowCopy()311 public Cell shallowCopy() { 312 return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(), 313 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 314 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), 315 currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset, 316 memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer); 317 } 318 } 319 320 /** 321 * Copies only the key part of the keybuffer by doing a deep copy and passes the 322 * seeker state members for taking a clone. 323 * Note that the value byte[] part is still pointing to the currentBuffer and the 324 * represented by the valueOffset and valueLength 325 */ 326 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 327 // there. So this has to be an instance of SettableSequenceId. SeekerState need not be 328 // SettableSequenceId as we never return that to top layers. When we have to, we make 329 // ClonedSeekerState from it. 330 protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId { 331 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 332 + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 333 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY)); 334 private byte[] keyOnlyBuffer; 335 private ByteBuffer currentBuffer; 336 private short rowLength; 337 private int familyOffset; 338 private byte familyLength; 339 private int qualifierOffset; 340 private int qualifierLength; 341 private long timestamp; 342 private byte typeByte; 343 private int valueOffset; 344 private int valueLength; 345 private int tagsLength; 346 private int tagsOffset; 347 private byte[] cloneTagsBuffer; 348 private long seqId; 349 private TagCompressionContext tagCompressionContext; 350 ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength, int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId, int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext, byte[] tagsBuffer)351 protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength, 352 int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength, 353 long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId, 354 int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext, 355 byte[] tagsBuffer) { 356 this.currentBuffer = currentBuffer; 357 keyOnlyBuffer = new byte[keyLength]; 358 this.tagCompressionContext = tagCompressionContext; 359 this.rowLength = rowLength; 360 this.familyOffset = familyOffset; 361 this.familyLength = familyLength; 362 this.qualifierOffset = qualOffset; 363 this.qualifierLength = qualLength; 364 this.timestamp = timeStamp; 365 this.typeByte = typeByte; 366 this.valueLength = valueLen; 367 this.valueOffset = valueOffset; 368 this.tagsOffset = tagsOffset; 369 this.tagsLength = tagsLength; 370 System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength); 371 if (tagCompressionContext != null) { 372 this.cloneTagsBuffer = new byte[tagsLength]; 373 System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength); 374 } 375 setSequenceId(seqId); 376 } 377 378 @Override getRowArray()379 public byte[] getRowArray() { 380 return keyOnlyBuffer; 381 } 382 383 @Override getFamilyArray()384 public byte[] getFamilyArray() { 385 return keyOnlyBuffer; 386 } 387 388 @Override getQualifierArray()389 public byte[] getQualifierArray() { 390 return keyOnlyBuffer; 391 } 392 393 @Override getRowOffset()394 public int getRowOffset() { 395 return Bytes.SIZEOF_SHORT; 396 } 397 398 @Override getRowLength()399 public short getRowLength() { 400 return rowLength; 401 } 402 403 @Override getFamilyOffset()404 public int getFamilyOffset() { 405 return familyOffset; 406 } 407 408 @Override getFamilyLength()409 public byte getFamilyLength() { 410 return familyLength; 411 } 412 413 @Override getQualifierOffset()414 public int getQualifierOffset() { 415 return qualifierOffset; 416 } 417 418 @Override getQualifierLength()419 public int getQualifierLength() { 420 return qualifierLength; 421 } 422 423 @Override getTimestamp()424 public long getTimestamp() { 425 return timestamp; 426 } 427 428 @Override getTypeByte()429 public byte getTypeByte() { 430 return typeByte; 431 } 432 433 @Override 434 @Deprecated getMvccVersion()435 public long getMvccVersion() { 436 return getSequenceId(); 437 } 438 439 @Override getSequenceId()440 public long getSequenceId() { 441 return seqId; 442 } 443 444 @Override getValueArray()445 public byte[] getValueArray() { 446 return currentBuffer.array(); 447 } 448 449 @Override getValueOffset()450 public int getValueOffset() { 451 return currentBuffer.arrayOffset() + valueOffset; 452 } 453 454 @Override getValueLength()455 public int getValueLength() { 456 return valueLength; 457 } 458 459 @Override getTagsArray()460 public byte[] getTagsArray() { 461 if (tagCompressionContext != null) { 462 return cloneTagsBuffer; 463 } 464 return currentBuffer.array(); 465 } 466 467 @Override getTagsOffset()468 public int getTagsOffset() { 469 if (tagCompressionContext != null) { 470 return 0; 471 } 472 return currentBuffer.arrayOffset() + tagsOffset; 473 } 474 475 @Override getTagsLength()476 public int getTagsLength() { 477 return tagsLength; 478 } 479 480 @Override 481 @Deprecated getValue()482 public byte[] getValue() { 483 return CellUtil.cloneValue(this); 484 } 485 486 @Override 487 @Deprecated getFamily()488 public byte[] getFamily() { 489 return CellUtil.cloneFamily(this); 490 } 491 492 @Override 493 @Deprecated getQualifier()494 public byte[] getQualifier() { 495 return CellUtil.cloneQualifier(this); 496 } 497 498 @Override 499 @Deprecated getRow()500 public byte[] getRow() { 501 return CellUtil.cloneRow(this); 502 } 503 504 @Override toString()505 public String toString() { 506 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 507 + getValueLength() + "/seqid=" + seqId; 508 } 509 510 @Override setSequenceId(long seqId)511 public void setSequenceId(long seqId) { 512 this.seqId = seqId; 513 } 514 515 @Override heapSize()516 public long heapSize() { 517 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 518 } 519 } 520 521 protected abstract static class 522 BufferedEncodedSeeker<STATE extends SeekerState> 523 implements EncodedSeeker { 524 protected HFileBlockDecodingContext decodingCtx; 525 protected final KVComparator comparator; 526 protected final SamePrefixComparator<byte[]> samePrefixComparator; 527 protected ByteBuffer currentBuffer; 528 protected STATE current = createSeekerState(); // always valid 529 protected STATE previous = createSeekerState(); // may not be valid 530 protected TagCompressionContext tagCompressionContext = null; 531 BufferedEncodedSeeker(KVComparator comparator, HFileBlockDecodingContext decodingCtx)532 public BufferedEncodedSeeker(KVComparator comparator, 533 HFileBlockDecodingContext decodingCtx) { 534 this.comparator = comparator; 535 this.samePrefixComparator = comparator; 536 this.decodingCtx = decodingCtx; 537 if (decodingCtx.getHFileContext().isCompressTags()) { 538 try { 539 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 540 } catch (Exception e) { 541 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 542 } 543 } 544 } 545 includesMvcc()546 protected boolean includesMvcc() { 547 return this.decodingCtx.getHFileContext().isIncludesMvcc(); 548 } 549 includesTags()550 protected boolean includesTags() { 551 return this.decodingCtx.getHFileContext().isIncludesTags(); 552 } 553 554 @Override compareKey(KVComparator comparator, byte[] key, int offset, int length)555 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { 556 return comparator.compareFlatKey(key, offset, length, 557 current.keyBuffer, 0, current.keyLength); 558 } 559 560 @Override compareKey(KVComparator comparator, Cell key)561 public int compareKey(KVComparator comparator, Cell key) { 562 return comparator.compareOnlyKeyPortion(key, 563 new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength)); 564 } 565 566 @Override setCurrentBuffer(ByteBuffer buffer)567 public void setCurrentBuffer(ByteBuffer buffer) { 568 if (this.tagCompressionContext != null) { 569 this.tagCompressionContext.clear(); 570 } 571 currentBuffer = buffer; 572 current.currentBuffer = currentBuffer; 573 if(tagCompressionContext != null) { 574 current.tagCompressionContext = tagCompressionContext; 575 } 576 decodeFirst(); 577 current.setKey(current.keyBuffer, current.memstoreTS); 578 previous.invalidate(); 579 } 580 581 @Override getKeyDeepCopy()582 public ByteBuffer getKeyDeepCopy() { 583 ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength); 584 keyBuffer.put(current.keyBuffer, 0, current.keyLength); 585 keyBuffer.rewind(); 586 return keyBuffer; 587 } 588 589 @Override getValueShallowCopy()590 public ByteBuffer getValueShallowCopy() { 591 ByteBuffer dup = currentBuffer.duplicate(); 592 dup.position(current.valueOffset); 593 dup.limit(current.valueOffset + current.valueLength); 594 return dup.slice(); 595 } 596 597 @Override getKeyValue()598 public Cell getKeyValue() { 599 return current.shallowCopy(); 600 } 601 602 @Override rewind()603 public void rewind() { 604 currentBuffer.rewind(); 605 if (tagCompressionContext != null) { 606 tagCompressionContext.clear(); 607 } 608 decodeFirst(); 609 current.setKey(current.keyBuffer, current.memstoreTS); 610 previous.invalidate(); 611 } 612 613 @Override next()614 public boolean next() { 615 if (!currentBuffer.hasRemaining()) { 616 return false; 617 } 618 decodeNext(); 619 current.setKey(current.keyBuffer, current.memstoreTS); 620 previous.invalidate(); 621 return true; 622 } 623 decodeTags()624 protected void decodeTags() { 625 current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer); 626 if (tagCompressionContext != null) { 627 if (current.uncompressTags) { 628 // Tag compression is been used. uncompress it into tagsBuffer 629 current.ensureSpaceForTags(); 630 try { 631 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 632 current.tagsBuffer, 0, current.tagsLength); 633 } catch (IOException e) { 634 throw new RuntimeException("Exception while uncompressing tags", e); 635 } 636 } else { 637 ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength); 638 current.uncompressTags = true;// Reset this. 639 } 640 current.tagsOffset = -1; 641 } else { 642 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 643 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 644 current.tagsOffset = currentBuffer.position(); 645 ByteBufferUtils.skip(currentBuffer, current.tagsLength); 646 } 647 } 648 649 @Override seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore)650 public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) { 651 return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore); 652 } 653 654 @Override seekToKeyInBlock(Cell seekCell, boolean seekBefore)655 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { 656 int rowCommonPrefix = 0; 657 int familyCommonPrefix = 0; 658 int qualCommonPrefix = 0; 659 previous.invalidate(); 660 KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue(); 661 do { 662 int comp; 663 if (samePrefixComparator != null) { 664 currentCell.setKey(current.keyBuffer, 0, current.keyLength); 665 if (current.lastCommonPrefix != 0) { 666 // The KV format has row key length also in the byte array. The 667 // common prefix 668 // includes it. So we need to subtract to find out the common prefix 669 // in the 670 // row part alone 671 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 672 } 673 if (current.lastCommonPrefix <= 2) { 674 rowCommonPrefix = 0; 675 } 676 rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell, 677 rowCommonPrefix); 678 comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix); 679 if (comp == 0) { 680 comp = compareTypeBytes(seekCell, currentCell); 681 if (comp == 0) { 682 // Subtract the fixed row key length and the family key fixed length 683 familyCommonPrefix = Math.max( 684 0, 685 Math.min(familyCommonPrefix, 686 current.lastCommonPrefix - (3 + currentCell.getRowLength()))); 687 familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell, 688 currentCell, familyCommonPrefix); 689 comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell, 690 familyCommonPrefix); 691 if (comp == 0) { 692 // subtract the rowkey fixed length and the family key fixed 693 // length 694 qualCommonPrefix = Math.max( 695 0, 696 Math.min( 697 qualCommonPrefix, 698 current.lastCommonPrefix 699 - (3 + currentCell.getRowLength() + currentCell.getFamilyLength()))); 700 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell, 701 currentCell, qualCommonPrefix); 702 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell, 703 qualCommonPrefix); 704 if (comp == 0) { 705 comp = CellComparator.compareTimestamps(seekCell, currentCell); 706 if (comp == 0) { 707 // Compare types. Let the delete types sort ahead of puts; 708 // i.e. types 709 // of higher numbers sort before those of lesser numbers. 710 // Maximum 711 // (255) 712 // appears ahead of everything, and minimum (0) appears 713 // after 714 // everything. 715 comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte()); 716 } 717 } 718 } 719 } 720 } 721 } else { 722 Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength); 723 comp = comparator.compareOnlyKeyPortion(seekCell, r); 724 } 725 if (comp == 0) { // exact match 726 if (seekBefore) { 727 if (!previous.isValid()) { 728 // The caller (seekBefore) has to ensure that we are not at the 729 // first key in the block. 730 throw new IllegalStateException("Cannot seekBefore if " 731 + "positioned at the first key in the block: key=" 732 + Bytes.toStringBinary(seekCell.getRowArray())); 733 } 734 moveToPrevious(); 735 return 1; 736 } 737 return 0; 738 } 739 740 if (comp < 0) { // already too large, check previous 741 if (previous.isValid()) { 742 moveToPrevious(); 743 } else { 744 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 745 } 746 return 1; 747 } 748 749 // move to next, if more data is available 750 if (currentBuffer.hasRemaining()) { 751 previous.copyFromNext(current); 752 decodeNext(); 753 current.setKey(current.keyBuffer, current.memstoreTS); 754 } else { 755 break; 756 } 757 } while (true); 758 759 // we hit the end of the block, not an exact match 760 return 1; 761 } 762 compareTypeBytes(Cell key, Cell right)763 private int compareTypeBytes(Cell key, Cell right) { 764 if (key.getFamilyLength() + key.getQualifierLength() == 0 765 && key.getTypeByte() == Type.Minimum.getCode()) { 766 // left is "bigger", i.e. it appears later in the sorted order 767 return 1; 768 } 769 if (right.getFamilyLength() + right.getQualifierLength() == 0 770 && right.getTypeByte() == Type.Minimum.getCode()) { 771 return -1; 772 } 773 return 0; 774 } 775 776 moveToPrevious()777 private void moveToPrevious() { 778 if (!previous.isValid()) { 779 throw new IllegalStateException( 780 "Can move back only once and not in first key in the block."); 781 } 782 783 STATE tmp = previous; 784 previous = current; 785 current = tmp; 786 787 // move after last key value 788 currentBuffer.position(current.nextKvOffset); 789 // Already decoded the tag bytes. We cache this tags into current state and also the total 790 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 791 // the tags again. This might pollute the Data Dictionary what we use for the compression. 792 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 793 // 'tagsCompressedLength' bytes of source stream. 794 // See in decodeTags() 795 current.tagsBuffer = previous.tagsBuffer; 796 current.tagsCompressedLength = previous.tagsCompressedLength; 797 current.uncompressTags = false; 798 current.setKey(current.keyBuffer, current.memstoreTS); 799 previous.invalidate(); 800 } 801 802 @SuppressWarnings("unchecked") createSeekerState()803 protected STATE createSeekerState() { 804 // This will fail for non-default seeker state if the subclass does not 805 // override this method. 806 return (STATE) new SeekerState(); 807 } 808 decodeFirst()809 abstract protected void decodeFirst(); decodeNext()810 abstract protected void decodeNext(); 811 } 812 813 /** 814 * @param cell 815 * @param out 816 * @param encodingCtx 817 * @return unencoded size added 818 * @throws IOException 819 */ afterEncodingKeyValue(Cell cell, DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx)820 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, 821 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 822 int size = 0; 823 if (encodingCtx.getHFileContext().isIncludesTags()) { 824 int tagsLength = cell.getTagsLength(); 825 ByteBufferUtils.putCompressedInt(out, tagsLength); 826 // There are some tags to be written 827 if (tagsLength > 0) { 828 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 829 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 830 // the tags using Dictionary compression in such a case 831 if (tagCompressionContext != null) { 832 tagCompressionContext 833 .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength); 834 } else { 835 out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); 836 } 837 } 838 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 839 } 840 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 841 // Copy memstore timestamp from the byte buffer to the output stream. 842 long memstoreTS = cell.getSequenceId(); 843 WritableUtils.writeVLong(out, memstoreTS); 844 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 845 // avoided. 846 size += WritableUtils.getVIntSize(memstoreTS); 847 } 848 return size; 849 } 850 afterDecodingKeyValue(DataInputStream source, ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx)851 protected final void afterDecodingKeyValue(DataInputStream source, 852 ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 853 if (decodingCtx.getHFileContext().isIncludesTags()) { 854 int tagsLength = ByteBufferUtils.readCompressedInt(source); 855 // Put as unsigned short 856 dest.put((byte) ((tagsLength >> 8) & 0xff)); 857 dest.put((byte) (tagsLength & 0xff)); 858 if (tagsLength > 0) { 859 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 860 // When tag compression is been used in this file, tagCompressionContext will have a not 861 // null value passed. 862 if (tagCompressionContext != null) { 863 tagCompressionContext.uncompressTags(source, dest, tagsLength); 864 } else { 865 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 866 } 867 } 868 } 869 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 870 long memstoreTS = -1; 871 try { 872 // Copy memstore timestamp from the data input stream to the byte 873 // buffer. 874 memstoreTS = WritableUtils.readVLong(source); 875 ByteBufferUtils.writeVLong(dest, memstoreTS); 876 } catch (IOException ex) { 877 throw new RuntimeException("Unable to copy memstore timestamp " + 878 memstoreTS + " after decoding a key/value"); 879 } 880 } 881 } 882 883 @Override newDataBlockEncodingContext(DataBlockEncoding encoding, byte[] header, HFileContext meta)884 public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding, 885 byte[] header, HFileContext meta) { 886 return new HFileBlockDefaultEncodingContext(encoding, header, meta); 887 } 888 889 @Override newDataBlockDecodingContext(HFileContext meta)890 public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) { 891 return new HFileBlockDefaultDecodingContext(meta); 892 } 893 internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)894 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 895 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 896 throws IOException; 897 898 /** 899 * Asserts that there is at least the given amount of unfilled space 900 * remaining in the given buffer. 901 * @param out typically, the buffer we are writing to 902 * @param length the required space in the buffer 903 * @throws EncoderBufferTooSmallException If there are no enough bytes. 904 */ ensureSpace(ByteBuffer out, int length)905 protected static void ensureSpace(ByteBuffer out, int length) 906 throws EncoderBufferTooSmallException { 907 if (out.position() + length > out.limit()) { 908 throw new EncoderBufferTooSmallException( 909 "Buffer position=" + out.position() + 910 ", buffer limit=" + out.limit() + 911 ", length to be written=" + length); 912 } 913 } 914 915 @Override startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)916 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 917 throws IOException { 918 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 919 throw new IOException (this.getClass().getName() + " only accepts " 920 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + 921 "encoding context."); 922 } 923 924 HFileBlockDefaultEncodingContext encodingCtx = 925 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 926 encodingCtx.prepareEncoding(out); 927 if (encodingCtx.getHFileContext().isIncludesTags() 928 && encodingCtx.getHFileContext().isCompressTags()) { 929 if (encodingCtx.getTagCompressionContext() != null) { 930 // It will be overhead to create the TagCompressionContext again and again for every block 931 // encoding. 932 encodingCtx.getTagCompressionContext().clear(); 933 } else { 934 try { 935 TagCompressionContext tagCompressionContext = new TagCompressionContext( 936 LRUDictionary.class, Byte.MAX_VALUE); 937 encodingCtx.setTagCompressionContext(tagCompressionContext); 938 } catch (Exception e) { 939 throw new IOException("Failed to initialize TagCompressionContext", e); 940 } 941 } 942 } 943 ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 944 blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState()); 945 } 946 947 private static class BufferedDataBlockEncodingState extends EncodingState { 948 int unencodedDataSizeWritten = 0; 949 } 950 951 @Override encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)952 public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 953 throws IOException { 954 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx 955 .getEncodingState(); 956 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 957 state.unencodedDataSizeWritten += encodedKvSize; 958 return encodedKvSize; 959 } 960 internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, DataOutputStream out)961 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, 962 DataOutputStream out) throws IOException; 963 964 @Override endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, byte[] uncompressedBytesWithHeader)965 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 966 byte[] uncompressedBytesWithHeader) throws IOException { 967 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx 968 .getEncodingState(); 969 // Write the unencodedDataSizeWritten (with header size) 970 Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE 971 + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten 972 ); 973 if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) { 974 encodingCtx.postEncoding(BlockType.ENCODED_DATA); 975 } else { 976 encodingCtx.postEncoding(BlockType.DATA); 977 } 978 } 979 } 980