1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with this 4 * work for additional information regarding copyright ownership. The ASF 5 * licenses this file to you under the Apache License, Version 2.0 (the 6 * "License"); you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 * License for the specific language governing permissions and limitations 15 * under the License. 16 */ 17 package org.apache.hadoop.hbase.io.encoding; 18 19 import java.io.DataInputStream; 20 import java.io.DataOutputStream; 21 import java.io.IOException; 22 import java.nio.ByteBuffer; 23 24 import org.apache.hadoop.hbase.classification.InterfaceAudience; 25 import org.apache.hadoop.hbase.Cell; 26 import org.apache.hadoop.hbase.CellUtil; 27 import org.apache.hadoop.hbase.KeyValue; 28 import org.apache.hadoop.hbase.KeyValueUtil; 29 import org.apache.hadoop.hbase.KeyValue.KVComparator; 30 import org.apache.hadoop.hbase.util.ByteBufferUtils; 31 import org.apache.hadoop.hbase.util.Bytes; 32 33 /** 34 * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster. 35 * 36 * Compress using: 37 * - store size of common prefix 38 * - save column family once in the first KeyValue 39 * - use integer compression for key, value and prefix (7-bit encoding) 40 * - use bits to avoid duplication key length, value length 41 * and type if it same as previous 42 * - store in 3 bits length of prefix timestamp 43 * with previous KeyValue's timestamp 44 * - one bit which allow to omit value if it is the same 45 * 46 * Format: 47 * - 1 byte: flag 48 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) 49 * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag) 50 * - 1-5 bytes: prefix length 51 * - ... bytes: rest of the row (if prefix length is small enough) 52 * - ... bytes: qualifier (or suffix depending on prefix length) 53 * - 1-8 bytes: timestamp suffix 54 * - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag) 55 * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag) 56 * 57 */ 58 @InterfaceAudience.Private 59 public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { 60 final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2); 61 final int SHIFT_TIMESTAMP_LENGTH = 0; 62 final int FLAG_SAME_KEY_LENGTH = 1 << 3; 63 final int FLAG_SAME_VALUE_LENGTH = 1 << 4; 64 final int FLAG_SAME_TYPE = 1 << 5; 65 final int FLAG_SAME_VALUE = 1 << 6; 66 67 private static class FastDiffCompressionState extends CompressionState { 68 byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE]; 69 int prevTimestampOffset; 70 71 @Override readTimestamp(ByteBuffer in)72 protected void readTimestamp(ByteBuffer in) { 73 in.get(timestamp); 74 } 75 76 @Override copyFrom(CompressionState state)77 void copyFrom(CompressionState state) { 78 super.copyFrom(state); 79 FastDiffCompressionState state2 = (FastDiffCompressionState) state; 80 System.arraycopy(state2.timestamp, 0, timestamp, 0, 81 KeyValue.TIMESTAMP_SIZE); 82 prevTimestampOffset = state2.prevTimestampOffset; 83 } 84 85 /** 86 * Copies the first key/value from the given stream, and initializes 87 * decompression state based on it. Assumes that we have already read key 88 * and value lengths. Does not set {@link #qualifierLength} (not used by 89 * decompression) or {@link #prevOffset} (set by the calle afterwards). 90 */ decompressFirstKV(ByteBuffer out, DataInputStream in)91 private void decompressFirstKV(ByteBuffer out, DataInputStream in) 92 throws IOException { 93 int kvPos = out.position(); 94 out.putInt(keyLength); 95 out.putInt(valueLength); 96 prevTimestampOffset = out.position() + keyLength - 97 KeyValue.TIMESTAMP_TYPE_SIZE; 98 ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength); 99 rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET); 100 familyLength = out.get(kvPos + KeyValue.ROW_OFFSET + 101 KeyValue.ROW_LENGTH_SIZE + rowLength); 102 type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE); 103 } 104 105 } 106 findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf)107 private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) { 108 int commonPrefix = 0; 109 while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1) 110 && curTsBuf[commonPrefix] == prevTsBuf[commonPrefix]) { 111 commonPrefix++; 112 } 113 return commonPrefix; // has to be at most 7 bytes 114 } 115 uncompressSingleKeyValue(DataInputStream source, ByteBuffer out, FastDiffCompressionState state)116 private void uncompressSingleKeyValue(DataInputStream source, 117 ByteBuffer out, FastDiffCompressionState state) 118 throws IOException, EncoderBufferTooSmallException { 119 byte flag = source.readByte(); 120 int prevKeyLength = state.keyLength; 121 122 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 123 state.keyLength = ByteBufferUtils.readCompressedInt(source); 124 } 125 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 126 state.valueLength = ByteBufferUtils.readCompressedInt(source); 127 } 128 int commonLength = ByteBufferUtils.readCompressedInt(source); 129 130 ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET); 131 132 int kvPos = out.position(); 133 134 if (!state.isFirst()) { 135 // copy the prefix 136 int common; 137 int prevOffset; 138 139 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 140 out.putInt(state.keyLength); 141 out.putInt(state.valueLength); 142 prevOffset = state.prevOffset + KeyValue.ROW_OFFSET; 143 common = commonLength; 144 } else { 145 if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { 146 prevOffset = state.prevOffset; 147 common = commonLength + KeyValue.ROW_OFFSET; 148 } else { 149 out.putInt(state.keyLength); 150 prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE; 151 common = commonLength + KeyValue.KEY_LENGTH_SIZE; 152 } 153 } 154 155 ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common); 156 157 // copy the rest of the key from the buffer 158 int keyRestLength; 159 if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) { 160 // omit the family part of the key, it is always the same 161 int rowWithSizeLength; 162 int rowRestLength; 163 164 // check length of row 165 if (commonLength < KeyValue.ROW_LENGTH_SIZE) { 166 // not yet copied, do it now 167 ByteBufferUtils.copyFromStreamToBuffer(out, source, 168 KeyValue.ROW_LENGTH_SIZE - commonLength); 169 170 rowWithSizeLength = out.getShort(out.position() - 171 KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE; 172 rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE; 173 } else { 174 // already in kvBuffer, just read it 175 rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) + 176 KeyValue.ROW_LENGTH_SIZE; 177 rowRestLength = rowWithSizeLength - commonLength; 178 } 179 180 // copy the rest of row 181 ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength); 182 183 // copy the column family 184 ByteBufferUtils.copyFromBufferToBuffer(out, out, 185 state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE 186 + state.rowLength, state.familyLength 187 + KeyValue.FAMILY_LENGTH_SIZE); 188 state.rowLength = (short) (rowWithSizeLength - 189 KeyValue.ROW_LENGTH_SIZE); 190 191 keyRestLength = state.keyLength - rowWithSizeLength - 192 state.familyLength - 193 (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); 194 } else { 195 // prevRowWithSizeLength is the same as on previous row 196 keyRestLength = state.keyLength - commonLength - 197 KeyValue.TIMESTAMP_TYPE_SIZE; 198 } 199 // copy the rest of the key, after column family == column qualifier 200 ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength); 201 202 // copy timestamp 203 int prefixTimestamp = 204 (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH; 205 ByteBufferUtils.copyFromBufferToBuffer(out, out, 206 state.prevTimestampOffset, prefixTimestamp); 207 state.prevTimestampOffset = out.position() - prefixTimestamp; 208 ByteBufferUtils.copyFromStreamToBuffer(out, source, 209 KeyValue.TIMESTAMP_SIZE - prefixTimestamp); 210 211 // copy the type and value 212 if ((flag & FLAG_SAME_TYPE) != 0) { 213 out.put(state.type); 214 if ((flag & FLAG_SAME_VALUE) != 0) { 215 ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset + 216 KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); 217 } else { 218 ByteBufferUtils.copyFromStreamToBuffer(out, source, 219 state.valueLength); 220 } 221 } else { 222 if ((flag & FLAG_SAME_VALUE) != 0) { 223 ByteBufferUtils.copyFromStreamToBuffer(out, source, 224 KeyValue.TYPE_SIZE); 225 ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset + 226 KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); 227 } else { 228 ByteBufferUtils.copyFromStreamToBuffer(out, source, 229 state.valueLength + KeyValue.TYPE_SIZE); 230 } 231 state.type = out.get(state.prevTimestampOffset + 232 KeyValue.TIMESTAMP_SIZE); 233 } 234 } else { // this is the first element 235 state.decompressFirstKV(out, source); 236 } 237 238 state.prevOffset = kvPos; 239 } 240 241 @Override internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext, DataOutputStream out)242 public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext, 243 DataOutputStream out) throws IOException { 244 EncodingState state = encodingContext.getEncodingState(); 245 int size = compressSingleKeyValue(out, cell, state.prevCell); 246 size += afterEncodingKeyValue(cell, out, encodingContext); 247 state.prevCell = cell; 248 return size; 249 } 250 compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)251 private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell) 252 throws IOException { 253 byte flag = 0; 254 int kLength = KeyValueUtil.keyLength(cell); 255 int vLength = cell.getValueLength(); 256 257 if (prevCell == null) { 258 // copy the key, there is no common prefix with none 259 out.write(flag); 260 ByteBufferUtils.putCompressedInt(out, kLength); 261 ByteBufferUtils.putCompressedInt(out, vLength); 262 ByteBufferUtils.putCompressedInt(out, 0); 263 CellUtil.writeFlatKey(cell, out); 264 // Write the value part 265 out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 266 } else { 267 int preKeyLength = KeyValueUtil.keyLength(prevCell); 268 int preValLength = prevCell.getValueLength(); 269 // find a common prefix and skip it 270 int commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false); 271 272 if (kLength == preKeyLength) { 273 flag |= FLAG_SAME_KEY_LENGTH; 274 } 275 if (vLength == prevCell.getValueLength()) { 276 flag |= FLAG_SAME_VALUE_LENGTH; 277 } 278 if (cell.getTypeByte() == prevCell.getTypeByte()) { 279 flag |= FLAG_SAME_TYPE; 280 } 281 282 byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp()); 283 int commonTimestampPrefix = findCommonTimestampPrefix(curTsBuf, 284 Bytes.toBytes(prevCell.getTimestamp())); 285 286 flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH; 287 288 // Check if current and previous values are the same. Compare value 289 // length first as an optimization. 290 if (vLength == preValLength 291 && Bytes.equals(cell.getValueArray(), cell.getValueOffset(), vLength, 292 prevCell.getValueArray(), prevCell.getValueOffset(), preValLength)) { 293 flag |= FLAG_SAME_VALUE; 294 } 295 296 out.write(flag); 297 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 298 ByteBufferUtils.putCompressedInt(out, kLength); 299 } 300 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 301 ByteBufferUtils.putCompressedInt(out, vLength); 302 } 303 ByteBufferUtils.putCompressedInt(out, commonPrefix); 304 short rLen = cell.getRowLength(); 305 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 306 // Previous and current rows are different. Copy the differing part of 307 // the row, skip the column family, and copy the qualifier. 308 CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 309 out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); 310 } else { 311 // The common part includes the whole row. As the column family is the 312 // same across the whole file, it will automatically be included in the 313 // common prefix, so we need not special-case it here. 314 // What we write here is the non common part of the qualifier 315 int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 316 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 317 out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix, 318 cell.getQualifierLength() - commonQualPrefix); 319 } 320 // Write non common ts part 321 out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix); 322 323 // Write the type if it is not the same as before. 324 if ((flag & FLAG_SAME_TYPE) == 0) { 325 out.write(cell.getTypeByte()); 326 } 327 328 // Write the value if it is not the same as before. 329 if ((flag & FLAG_SAME_VALUE) == 0) { 330 out.write(cell.getValueArray(), cell.getValueOffset(), vLength); 331 } 332 } 333 return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 334 } 335 336 @Override internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)337 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 338 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 339 int decompressedSize = source.readInt(); 340 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + 341 allocateHeaderLength); 342 buffer.position(allocateHeaderLength); 343 FastDiffCompressionState state = new FastDiffCompressionState(); 344 while (source.available() > skipLastBytes) { 345 uncompressSingleKeyValue(source, buffer, state); 346 afterDecodingKeyValue(source, buffer, decodingCtx); 347 } 348 349 if (source.available() != skipLastBytes) { 350 throw new IllegalStateException("Read too much bytes."); 351 } 352 353 return buffer; 354 } 355 356 @Override getFirstKeyInBlock(ByteBuffer block)357 public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { 358 block.mark(); 359 block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); 360 int keyLength = ByteBufferUtils.readCompressedInt(block); 361 ByteBufferUtils.readCompressedInt(block); // valueLength 362 ByteBufferUtils.readCompressedInt(block); // commonLength 363 int pos = block.position(); 364 block.reset(); 365 ByteBuffer dup = block.duplicate(); 366 dup.position(pos); 367 dup.limit(pos + keyLength); 368 return dup.slice(); 369 } 370 371 @Override toString()372 public String toString() { 373 return FastDiffDeltaEncoder.class.getSimpleName(); 374 } 375 376 protected static class FastDiffSeekerState extends SeekerState { 377 private byte[] prevTimestampAndType = 378 new byte[KeyValue.TIMESTAMP_TYPE_SIZE]; 379 private int rowLengthWithSize; 380 private int familyLengthWithSize; 381 382 @Override copyFromNext(SeekerState that)383 protected void copyFromNext(SeekerState that) { 384 super.copyFromNext(that); 385 FastDiffSeekerState other = (FastDiffSeekerState) that; 386 System.arraycopy(other.prevTimestampAndType, 0, 387 prevTimestampAndType, 0, 388 KeyValue.TIMESTAMP_TYPE_SIZE); 389 rowLengthWithSize = other.rowLengthWithSize; 390 familyLengthWithSize = other.familyLengthWithSize; 391 } 392 } 393 394 @Override createSeeker(KVComparator comparator, final HFileBlockDecodingContext decodingCtx)395 public EncodedSeeker createSeeker(KVComparator comparator, 396 final HFileBlockDecodingContext decodingCtx) { 397 return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) { 398 private void decode(boolean isFirst) { 399 byte flag = currentBuffer.get(); 400 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 401 if (!isFirst) { 402 System.arraycopy(current.keyBuffer, 403 current.keyLength - current.prevTimestampAndType.length, 404 current.prevTimestampAndType, 0, 405 current.prevTimestampAndType.length); 406 } 407 current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer); 408 } 409 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 410 current.valueLength = 411 ByteBufferUtils.readCompressedInt(currentBuffer); 412 } 413 current.lastCommonPrefix = 414 ByteBufferUtils.readCompressedInt(currentBuffer); 415 416 current.ensureSpaceForKey(); 417 418 if (isFirst) { 419 // copy everything 420 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 421 current.keyLength - current.prevTimestampAndType.length); 422 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + 423 Bytes.SIZEOF_SHORT; 424 current.familyLengthWithSize = 425 current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE; 426 } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) { 427 // length of row is different, copy everything except family 428 429 // copy the row size 430 int oldRowLengthWithSize = current.rowLengthWithSize; 431 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 432 Bytes.SIZEOF_SHORT - current.lastCommonPrefix); 433 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + 434 Bytes.SIZEOF_SHORT; 435 436 // move the column family 437 System.arraycopy(current.keyBuffer, oldRowLengthWithSize, 438 current.keyBuffer, current.rowLengthWithSize, 439 current.familyLengthWithSize); 440 441 // copy the rest of row 442 currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT, 443 current.rowLengthWithSize - Bytes.SIZEOF_SHORT); 444 445 // copy the qualifier 446 currentBuffer.get(current.keyBuffer, current.rowLengthWithSize 447 + current.familyLengthWithSize, current.keyLength 448 - current.rowLengthWithSize - current.familyLengthWithSize 449 - current.prevTimestampAndType.length); 450 } else if (current.lastCommonPrefix < current.rowLengthWithSize) { 451 // We have to copy part of row and qualifier, but the column family 452 // is in the right place. 453 454 // before column family (rest of row) 455 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 456 current.rowLengthWithSize - current.lastCommonPrefix); 457 458 // after column family (qualifier) 459 currentBuffer.get(current.keyBuffer, current.rowLengthWithSize 460 + current.familyLengthWithSize, current.keyLength 461 - current.rowLengthWithSize - current.familyLengthWithSize 462 - current.prevTimestampAndType.length); 463 } else { 464 // copy just the ending 465 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 466 current.keyLength - current.prevTimestampAndType.length 467 - current.lastCommonPrefix); 468 } 469 470 // timestamp 471 int pos = current.keyLength - current.prevTimestampAndType.length; 472 int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>> 473 SHIFT_TIMESTAMP_LENGTH; 474 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 475 System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer, 476 pos, commonTimestampPrefix); 477 } 478 pos += commonTimestampPrefix; 479 currentBuffer.get(current.keyBuffer, pos, 480 Bytes.SIZEOF_LONG - commonTimestampPrefix); 481 pos += Bytes.SIZEOF_LONG - commonTimestampPrefix; 482 483 // type 484 if ((flag & FLAG_SAME_TYPE) == 0) { 485 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE); 486 } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 487 current.keyBuffer[pos] = 488 current.prevTimestampAndType[Bytes.SIZEOF_LONG]; 489 } 490 491 // handle value 492 if ((flag & FLAG_SAME_VALUE) == 0) { 493 current.valueOffset = currentBuffer.position(); 494 ByteBufferUtils.skip(currentBuffer, current.valueLength); 495 } 496 497 if (includesTags()) { 498 decodeTags(); 499 } 500 if (includesMvcc()) { 501 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); 502 } else { 503 current.memstoreTS = 0; 504 } 505 current.nextKvOffset = currentBuffer.position(); 506 } 507 508 @Override 509 protected void decodeFirst() { 510 ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); 511 decode(true); 512 } 513 514 @Override 515 protected void decodeNext() { 516 decode(false); 517 } 518 519 @Override 520 protected FastDiffSeekerState createSeekerState() { 521 return new FastDiffSeekerState(); 522 } 523 }; 524 } 525 } 526