1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.codec.prefixtree.encode; 20 21 import java.io.IOException; 22 import java.io.OutputStream; 23 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.apache.hadoop.hbase.classification.InterfaceAudience; 27 import org.apache.hadoop.hbase.Cell; 28 import org.apache.hadoop.hbase.CellUtil; 29 import org.apache.hadoop.hbase.KeyValueUtil; 30 import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeBlockMeta; 31 import org.apache.hadoop.hbase.codec.prefixtree.encode.column.ColumnSectionWriter; 32 import org.apache.hadoop.hbase.codec.prefixtree.encode.other.CellTypeEncoder; 33 import org.apache.hadoop.hbase.codec.prefixtree.encode.other.ColumnNodeType; 34 import org.apache.hadoop.hbase.codec.prefixtree.encode.other.LongEncoder; 35 import org.apache.hadoop.hbase.codec.prefixtree.encode.row.RowSectionWriter; 36 import org.apache.hadoop.hbase.codec.prefixtree.encode.tokenize.Tokenizer; 37 import org.apache.hadoop.hbase.io.CellOutputStream; 38 import org.apache.hadoop.hbase.util.ArrayUtils; 39 import org.apache.hadoop.hbase.util.ByteRange; 40 import org.apache.hadoop.hbase.util.SimpleMutableByteRange; 41 import org.apache.hadoop.hbase.util.byterange.ByteRangeSet; 42 import org.apache.hadoop.hbase.util.byterange.impl.ByteRangeHashSet; 43 import org.apache.hadoop.hbase.util.byterange.impl.ByteRangeTreeSet; 44 import org.apache.hadoop.hbase.util.vint.UFIntTool; 45 import org.apache.hadoop.io.WritableUtils; 46 /** 47 * This is the primary class for converting a CellOutputStream into an encoded byte[]. As Cells are 48 * added they are completely copied into the various encoding structures. This is important because 49 * usually the cells being fed in during compactions will be transient.<br> 50 * <br> 51 * Usage:<br> 52 * 1) constructor<br> 53 * 4) append cells in sorted order: write(Cell cell)<br> 54 * 5) flush()<br> 55 */ 56 @InterfaceAudience.Private 57 public class PrefixTreeEncoder implements CellOutputStream { 58 59 /**************** static ************************/ 60 61 protected static final Log LOG = LogFactory.getLog(PrefixTreeEncoder.class); 62 63 //future-proof where HBase supports multiple families in a data block. 64 public static final boolean MULITPLE_FAMILIES_POSSIBLE = false; 65 66 private static final boolean USE_HASH_COLUMN_SORTER = true; 67 private static final int INITIAL_PER_CELL_ARRAY_SIZES = 256; 68 private static final int VALUE_BUFFER_INIT_SIZE = 64 * 1024; 69 70 71 /**************** fields *************************/ 72 73 protected long numResets = 0L; 74 75 protected OutputStream outputStream; 76 77 /* 78 * Cannot change during a single block's encoding. If false, then substitute incoming Cell's 79 * mvccVersion with zero and write out the block as usual. 80 */ 81 protected boolean includeMvccVersion; 82 83 /* 84 * reusable ByteRanges used for communicating with the sorters/compilers 85 */ 86 protected ByteRange rowRange; 87 protected ByteRange familyRange; 88 protected ByteRange qualifierRange; 89 protected ByteRange tagsRange; 90 91 /* 92 * incoming Cell fields are copied into these arrays 93 */ 94 protected long[] timestamps; 95 protected long[] mvccVersions; 96 protected byte[] typeBytes; 97 protected int[] valueOffsets; 98 protected int[] tagsOffsets; 99 protected byte[] values; 100 protected byte[] tags; 101 102 protected PrefixTreeBlockMeta blockMeta; 103 104 /* 105 * Sub-encoders for the simple long/byte fields of a Cell. Add to these as each cell arrives and 106 * compile before flushing. 107 */ 108 protected LongEncoder timestampEncoder; 109 protected LongEncoder mvccVersionEncoder; 110 protected CellTypeEncoder cellTypeEncoder; 111 112 /* 113 * Structures used for collecting families and qualifiers, de-duplicating them, and sorting them 114 * so they can be passed to the tokenizers. Unlike row keys where we can detect duplicates by 115 * comparing only with the previous row key, families and qualifiers can arrive in unsorted order 116 * in blocks spanning multiple rows. We must collect them all into a set to de-duplicate them. 117 */ 118 protected ByteRangeSet familyDeduplicator; 119 protected ByteRangeSet qualifierDeduplicator; 120 protected ByteRangeSet tagsDeduplicator; 121 /* 122 * Feed sorted byte[]s into these tokenizers which will convert the byte[]s to an in-memory 123 * trie structure with nodes connected by memory pointers (not serializable yet). 124 */ 125 protected Tokenizer rowTokenizer; 126 protected Tokenizer familyTokenizer; 127 protected Tokenizer qualifierTokenizer; 128 protected Tokenizer tagsTokenizer; 129 130 /* 131 * Writers take an in-memory trie, sort the nodes, calculate offsets and lengths, and write 132 * all information to an output stream of bytes that can be stored on disk. 133 */ 134 protected RowSectionWriter rowWriter; 135 protected ColumnSectionWriter familyWriter; 136 protected ColumnSectionWriter qualifierWriter; 137 protected ColumnSectionWriter tagsWriter; 138 139 /* 140 * Integers used for counting cells and bytes. We keep track of the size of the Cells as if they 141 * were full KeyValues because some parts of HBase like to know the "unencoded size". 142 */ 143 protected int totalCells = 0; 144 protected int totalUnencodedBytes = 0;//numBytes if the cells were KeyValues 145 protected int totalValueBytes = 0; 146 protected int totalTagBytes = 0; 147 protected int maxValueLength = 0; 148 protected int maxTagLength = 0; 149 protected int totalBytes = 0;// 150 151 152 /***************** construct ***********************/ 153 PrefixTreeEncoder(OutputStream outputStream, boolean includeMvccVersion)154 public PrefixTreeEncoder(OutputStream outputStream, boolean includeMvccVersion) { 155 // used during cell accumulation 156 this.blockMeta = new PrefixTreeBlockMeta(); 157 this.rowRange = new SimpleMutableByteRange(); 158 this.familyRange = new SimpleMutableByteRange(); 159 this.qualifierRange = new SimpleMutableByteRange(); 160 this.timestamps = new long[INITIAL_PER_CELL_ARRAY_SIZES]; 161 this.mvccVersions = new long[INITIAL_PER_CELL_ARRAY_SIZES]; 162 this.typeBytes = new byte[INITIAL_PER_CELL_ARRAY_SIZES]; 163 this.valueOffsets = new int[INITIAL_PER_CELL_ARRAY_SIZES]; 164 this.values = new byte[VALUE_BUFFER_INIT_SIZE]; 165 166 // used during compilation 167 this.familyDeduplicator = USE_HASH_COLUMN_SORTER ? new ByteRangeHashSet() 168 : new ByteRangeTreeSet(); 169 this.qualifierDeduplicator = USE_HASH_COLUMN_SORTER ? new ByteRangeHashSet() 170 : new ByteRangeTreeSet(); 171 this.timestampEncoder = new LongEncoder(); 172 this.mvccVersionEncoder = new LongEncoder(); 173 this.cellTypeEncoder = new CellTypeEncoder(); 174 this.rowTokenizer = new Tokenizer(); 175 this.familyTokenizer = new Tokenizer(); 176 this.qualifierTokenizer = new Tokenizer(); 177 this.rowWriter = new RowSectionWriter(); 178 this.familyWriter = new ColumnSectionWriter(); 179 this.qualifierWriter = new ColumnSectionWriter(); 180 initializeTagHelpers(); 181 182 reset(outputStream, includeMvccVersion); 183 } 184 reset(OutputStream outputStream, boolean includeMvccVersion)185 public void reset(OutputStream outputStream, boolean includeMvccVersion) { 186 ++numResets; 187 this.includeMvccVersion = includeMvccVersion; 188 this.outputStream = outputStream; 189 valueOffsets[0] = 0; 190 familyDeduplicator.reset(); 191 qualifierDeduplicator.reset(); 192 tagsDeduplicator.reset(); 193 tagsWriter.reset(); 194 tagsTokenizer.reset(); 195 rowTokenizer.reset(); 196 timestampEncoder.reset(); 197 mvccVersionEncoder.reset(); 198 cellTypeEncoder.reset(); 199 familyTokenizer.reset(); 200 qualifierTokenizer.reset(); 201 rowWriter.reset(); 202 familyWriter.reset(); 203 qualifierWriter.reset(); 204 205 totalCells = 0; 206 totalUnencodedBytes = 0; 207 totalValueBytes = 0; 208 maxValueLength = 0; 209 totalBytes = 0; 210 } 211 initializeTagHelpers()212 protected void initializeTagHelpers() { 213 this.tagsRange = new SimpleMutableByteRange(); 214 this.tagsDeduplicator = USE_HASH_COLUMN_SORTER ? new ByteRangeHashSet() 215 : new ByteRangeTreeSet(); 216 this.tagsTokenizer = new Tokenizer(); 217 this.tagsWriter = new ColumnSectionWriter(); 218 } 219 220 /** 221 * Check that the arrays used to hold cell fragments are large enough for the cell that is being 222 * added. Since the PrefixTreeEncoder is cached between uses, these arrays may grow during the 223 * first few block encodings but should stabilize quickly. 224 */ ensurePerCellCapacities()225 protected void ensurePerCellCapacities() { 226 int currentCapacity = valueOffsets.length; 227 int neededCapacity = totalCells + 2;// some things write one index ahead. +2 to be safe 228 if (neededCapacity < currentCapacity) { 229 return; 230 } 231 232 int padding = neededCapacity;//this will double the array size 233 timestamps = ArrayUtils.growIfNecessary(timestamps, neededCapacity, padding); 234 mvccVersions = ArrayUtils.growIfNecessary(mvccVersions, neededCapacity, padding); 235 typeBytes = ArrayUtils.growIfNecessary(typeBytes, neededCapacity, padding); 236 valueOffsets = ArrayUtils.growIfNecessary(valueOffsets, neededCapacity, padding); 237 } 238 239 /******************** CellOutputStream methods *************************/ 240 241 /** 242 * Note: Unused until support is added to the scanner/heap 243 * <p/> 244 * The following method are optimized versions of write(Cell cell). The result should be 245 * identical, however the implementation may be able to execute them much more efficiently because 246 * it does not need to compare the unchanged fields with the previous cell's. 247 * <p/> 248 * Consider the benefits during compaction when paired with a CellScanner that is also aware of 249 * row boundaries. The CellScanner can easily use these methods instead of blindly passing Cells 250 * to the write(Cell cell) method. 251 * <p/> 252 * The savings of skipping duplicate row detection are significant with long row keys. A 253 * DataBlockEncoder may store a row key once in combination with a count of how many cells are in 254 * the row. With a 100 byte row key, we can replace 100 byte comparisons with a single increment 255 * of the counter, and that is for every cell in the row. 256 */ 257 258 /** 259 * Add a Cell to the output stream but repeat the previous row. 260 */ 261 //@Override writeWithRepeatRow(Cell cell)262 public void writeWithRepeatRow(Cell cell) { 263 ensurePerCellCapacities();//can we optimize away some of this? 264 265 //save a relatively expensive row comparison, incrementing the row's counter instead 266 rowTokenizer.incrementNumOccurrencesOfLatestValue(); 267 addFamilyPart(cell); 268 addQualifierPart(cell); 269 addAfterRowFamilyQualifier(cell); 270 } 271 272 273 @Override write(Cell cell)274 public void write(Cell cell) { 275 ensurePerCellCapacities(); 276 277 rowTokenizer.addSorted(CellUtil.fillRowRange(cell, rowRange)); 278 addFamilyPart(cell); 279 addQualifierPart(cell); 280 addTagPart(cell); 281 addAfterRowFamilyQualifier(cell); 282 } 283 284 addTagPart(Cell cell)285 private void addTagPart(Cell cell) { 286 CellUtil.fillTagRange(cell, tagsRange); 287 tagsDeduplicator.add(tagsRange); 288 } 289 290 /***************** internal add methods ************************/ 291 addAfterRowFamilyQualifier(Cell cell)292 private void addAfterRowFamilyQualifier(Cell cell){ 293 // timestamps 294 timestamps[totalCells] = cell.getTimestamp(); 295 timestampEncoder.add(cell.getTimestamp()); 296 297 // memstore timestamps 298 if (includeMvccVersion) { 299 mvccVersions[totalCells] = cell.getMvccVersion(); 300 mvccVersionEncoder.add(cell.getMvccVersion()); 301 totalUnencodedBytes += WritableUtils.getVIntSize(cell.getMvccVersion()); 302 }else{ 303 //must overwrite in case there was a previous version in this array slot 304 mvccVersions[totalCells] = 0L; 305 if(totalCells == 0){//only need to do this for the first cell added 306 mvccVersionEncoder.add(0L); 307 } 308 //totalUncompressedBytes += 0;//mvccVersion takes zero bytes when disabled 309 } 310 311 // types 312 typeBytes[totalCells] = cell.getTypeByte(); 313 cellTypeEncoder.add(cell.getTypeByte()); 314 315 // values 316 totalValueBytes += cell.getValueLength(); 317 // double the array each time we run out of space 318 values = ArrayUtils.growIfNecessary(values, totalValueBytes, 2 * totalValueBytes); 319 CellUtil.copyValueTo(cell, values, valueOffsets[totalCells]); 320 if (cell.getValueLength() > maxValueLength) { 321 maxValueLength = cell.getValueLength(); 322 } 323 valueOffsets[totalCells + 1] = totalValueBytes; 324 325 // general 326 totalUnencodedBytes += KeyValueUtil.length(cell); 327 ++totalCells; 328 } 329 addFamilyPart(Cell cell)330 private void addFamilyPart(Cell cell) { 331 if (MULITPLE_FAMILIES_POSSIBLE || totalCells == 0) { 332 CellUtil.fillFamilyRange(cell, familyRange); 333 familyDeduplicator.add(familyRange); 334 } 335 } 336 addQualifierPart(Cell cell)337 private void addQualifierPart(Cell cell) { 338 CellUtil.fillQualifierRange(cell, qualifierRange); 339 qualifierDeduplicator.add(qualifierRange); 340 } 341 342 343 /****************** compiling/flushing ********************/ 344 345 /** 346 * Expensive method. The second half of the encoding work happens here. 347 * 348 * Take all the separate accumulated data structures and turn them into a single stream of bytes 349 * which is written to the outputStream. 350 */ 351 @Override flush()352 public void flush() throws IOException { 353 compile(); 354 355 // do the actual flushing to the output stream. Order matters. 356 blockMeta.writeVariableBytesToOutputStream(outputStream); 357 rowWriter.writeBytes(outputStream); 358 familyWriter.writeBytes(outputStream); 359 qualifierWriter.writeBytes(outputStream); 360 tagsWriter.writeBytes(outputStream); 361 timestampEncoder.writeBytes(outputStream); 362 mvccVersionEncoder.writeBytes(outputStream); 363 //CellType bytes are in the row nodes. there is no additional type section 364 outputStream.write(values, 0, totalValueBytes); 365 } 366 367 /** 368 * Now that all the cells have been added, do the work to reduce them to a series of byte[] 369 * fragments that are ready to be written to the output stream. 370 */ compile()371 protected void compile(){ 372 blockMeta.setNumKeyValueBytes(totalUnencodedBytes); 373 int lastValueOffset = valueOffsets[totalCells]; 374 blockMeta.setValueOffsetWidth(UFIntTool.numBytes(lastValueOffset)); 375 blockMeta.setValueLengthWidth(UFIntTool.numBytes(maxValueLength)); 376 blockMeta.setNumValueBytes(totalValueBytes); 377 totalBytes += totalTagBytes + totalValueBytes; 378 379 //these compile methods will add to totalBytes 380 compileTypes(); 381 compileMvccVersions(); 382 compileTimestamps(); 383 compileTags(); 384 compileQualifiers(); 385 compileFamilies(); 386 compileRows(); 387 388 int numMetaBytes = blockMeta.calculateNumMetaBytes(); 389 blockMeta.setNumMetaBytes(numMetaBytes); 390 totalBytes += numMetaBytes; 391 } 392 393 /** 394 * <p> 395 * The following "compile" methods do any intermediate work necessary to transform the cell 396 * fragments collected during the writing phase into structures that are ready to write to the 397 * outputStream. 398 * </p> 399 * The family and qualifier treatment is almost identical, as is timestamp and mvccVersion. 400 */ 401 compileTypes()402 protected void compileTypes() { 403 blockMeta.setAllSameType(cellTypeEncoder.areAllSameType()); 404 if(cellTypeEncoder.areAllSameType()){ 405 blockMeta.setAllTypes(cellTypeEncoder.getOnlyType()); 406 } 407 } 408 compileMvccVersions()409 protected void compileMvccVersions() { 410 mvccVersionEncoder.compile(); 411 blockMeta.setMvccVersionFields(mvccVersionEncoder); 412 int numMvccVersionBytes = mvccVersionEncoder.getOutputArrayLength(); 413 totalBytes += numMvccVersionBytes; 414 } 415 compileTimestamps()416 protected void compileTimestamps() { 417 timestampEncoder.compile(); 418 blockMeta.setTimestampFields(timestampEncoder); 419 int numTimestampBytes = timestampEncoder.getOutputArrayLength(); 420 totalBytes += numTimestampBytes; 421 } 422 compileQualifiers()423 protected void compileQualifiers() { 424 blockMeta.setNumUniqueQualifiers(qualifierDeduplicator.size()); 425 qualifierDeduplicator.compile(); 426 qualifierTokenizer.addAll(qualifierDeduplicator.getSortedRanges()); 427 qualifierWriter.reconstruct(blockMeta, qualifierTokenizer, ColumnNodeType.QUALIFIER); 428 qualifierWriter.compile(); 429 int numQualifierBytes = qualifierWriter.getNumBytes(); 430 blockMeta.setNumQualifierBytes(numQualifierBytes); 431 totalBytes += numQualifierBytes; 432 } 433 compileFamilies()434 protected void compileFamilies() { 435 blockMeta.setNumUniqueFamilies(familyDeduplicator.size()); 436 familyDeduplicator.compile(); 437 familyTokenizer.addAll(familyDeduplicator.getSortedRanges()); 438 familyWriter.reconstruct(blockMeta, familyTokenizer, ColumnNodeType.FAMILY); 439 familyWriter.compile(); 440 int numFamilyBytes = familyWriter.getNumBytes(); 441 blockMeta.setNumFamilyBytes(numFamilyBytes); 442 totalBytes += numFamilyBytes; 443 } 444 compileTags()445 protected void compileTags() { 446 blockMeta.setNumUniqueTags(tagsDeduplicator.size()); 447 tagsDeduplicator.compile(); 448 tagsTokenizer.addAll(tagsDeduplicator.getSortedRanges()); 449 tagsWriter.reconstruct(blockMeta, tagsTokenizer, ColumnNodeType.TAGS); 450 tagsWriter.compile(); 451 int numTagBytes = tagsWriter.getNumBytes(); 452 blockMeta.setNumTagsBytes(numTagBytes); 453 totalBytes += numTagBytes; 454 } 455 compileRows()456 protected void compileRows() { 457 rowWriter.reconstruct(this); 458 rowWriter.compile(); 459 int numRowBytes = rowWriter.getNumBytes(); 460 blockMeta.setNumRowBytes(numRowBytes); 461 blockMeta.setRowTreeDepth(rowTokenizer.getTreeDepth()); 462 totalBytes += numRowBytes; 463 } 464 465 /********************* convenience getters ********************************/ 466 getValueOffset(int index)467 public long getValueOffset(int index) { 468 return valueOffsets[index]; 469 } 470 getValueLength(int index)471 public int getValueLength(int index) { 472 return (int) (valueOffsets[index + 1] - valueOffsets[index]); 473 } 474 475 /************************* get/set *************************************/ 476 getBlockMeta()477 public PrefixTreeBlockMeta getBlockMeta() { 478 return blockMeta; 479 } 480 getRowTokenizer()481 public Tokenizer getRowTokenizer() { 482 return rowTokenizer; 483 } 484 getTimestampEncoder()485 public LongEncoder getTimestampEncoder() { 486 return timestampEncoder; 487 } 488 getTotalBytes()489 public int getTotalBytes() { 490 return totalBytes; 491 } 492 getTimestamps()493 public long[] getTimestamps() { 494 return timestamps; 495 } 496 getMvccVersions()497 public long[] getMvccVersions() { 498 return mvccVersions; 499 } 500 getTypeBytes()501 public byte[] getTypeBytes() { 502 return typeBytes; 503 } 504 getMvccVersionEncoder()505 public LongEncoder getMvccVersionEncoder() { 506 return mvccVersionEncoder; 507 } 508 getFamilySorter()509 public ByteRangeSet getFamilySorter() { 510 return familyDeduplicator; 511 } 512 getQualifierSorter()513 public ByteRangeSet getQualifierSorter() { 514 return qualifierDeduplicator; 515 } 516 getTagSorter()517 public ByteRangeSet getTagSorter() { 518 return tagsDeduplicator; 519 } 520 getFamilyWriter()521 public ColumnSectionWriter getFamilyWriter() { 522 return familyWriter; 523 } 524 getQualifierWriter()525 public ColumnSectionWriter getQualifierWriter() { 526 return qualifierWriter; 527 } 528 getTagWriter()529 public ColumnSectionWriter getTagWriter() { 530 return tagsWriter; 531 } 532 getRowWriter()533 public RowSectionWriter getRowWriter() { 534 return rowWriter; 535 } 536 getValueByteRange()537 public ByteRange getValueByteRange() { 538 return new SimpleMutableByteRange(values, 0, totalValueBytes); 539 } 540 541 } 542