1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20 package org.apache.hadoop.hbase.regionserver; 21 22 import java.lang.management.ManagementFactory; 23 import java.lang.management.RuntimeMXBean; 24 import java.util.ArrayList; 25 import java.util.Collections; 26 import java.util.Iterator; 27 import java.util.List; 28 import java.util.NavigableSet; 29 import java.util.SortedSet; 30 import java.util.concurrent.atomic.AtomicLong; 31 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 import org.apache.hadoop.hbase.classification.InterfaceAudience; 35 import org.apache.hadoop.conf.Configuration; 36 import org.apache.hadoop.hbase.Cell; 37 import org.apache.hadoop.hbase.CellUtil; 38 import org.apache.hadoop.hbase.HBaseConfiguration; 39 import org.apache.hadoop.hbase.HConstants; 40 import org.apache.hadoop.hbase.KeyValue; 41 import org.apache.hadoop.hbase.KeyValueUtil; 42 import org.apache.hadoop.hbase.client.Scan; 43 import org.apache.hadoop.hbase.io.TimeRange; 44 import org.apache.hadoop.hbase.util.ByteRange; 45 import org.apache.hadoop.hbase.util.Bytes; 46 import org.apache.hadoop.hbase.util.ClassSize; 47 import org.apache.hadoop.hbase.util.CollectionBackedScanner; 48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 49 import org.apache.hadoop.hbase.util.ReflectionUtils; 50 import org.apache.htrace.Trace; 51 52 /** 53 * The MemStore holds in-memory modifications to the Store. Modifications 54 * are {@link Cell}s. When asked to flush, current memstore is moved 55 * to snapshot and is cleared. We continue to serve edits out of new memstore 56 * and backing snapshot until flusher reports in that the flush succeeded. At 57 * this point we let the snapshot go. 58 * <p> 59 * The MemStore functions should not be called in parallel. Callers should hold 60 * write and read locks. This is done in {@link HStore}. 61 * </p> 62 * 63 * TODO: Adjust size of the memstore when we remove items because they have 64 * been deleted. 65 * TODO: With new KVSLS, need to make sure we update HeapSize with difference 66 * in KV size. 67 */ 68 @InterfaceAudience.Private 69 public class DefaultMemStore implements MemStore { 70 private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); 71 static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; 72 private static final boolean USEMSLAB_DEFAULT = true; 73 static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; 74 75 private Configuration conf; 76 77 // MemStore. Use a CellSkipListSet rather than SkipListSet because of the 78 // better semantics. The Map will overwrite if passed a key it already had 79 // whereas the Set will not add new Cell if key is same though value might be 80 // different. Value is not important -- just make sure always same 81 // reference passed. 82 volatile CellSkipListSet cellSet; 83 84 // Snapshot of memstore. Made for flusher. 85 volatile CellSkipListSet snapshot; 86 87 final KeyValue.KVComparator comparator; 88 89 // Used to track own heapSize 90 final AtomicLong size; 91 private volatile long snapshotSize; 92 93 // Used to track when to flush 94 volatile long timeOfOldestEdit = Long.MAX_VALUE; 95 96 TimeRangeTracker timeRangeTracker; 97 TimeRangeTracker snapshotTimeRangeTracker; 98 99 volatile MemStoreLAB allocator; 100 volatile MemStoreLAB snapshotAllocator; 101 volatile long snapshotId; 102 volatile boolean tagsPresent; 103 104 /** 105 * Default constructor. Used for tests. 106 */ DefaultMemStore()107 public DefaultMemStore() { 108 this(HBaseConfiguration.create(), KeyValue.COMPARATOR); 109 } 110 111 /** 112 * Constructor. 113 * @param c Comparator 114 */ DefaultMemStore(final Configuration conf, final KeyValue.KVComparator c)115 public DefaultMemStore(final Configuration conf, 116 final KeyValue.KVComparator c) { 117 this.conf = conf; 118 this.comparator = c; 119 this.cellSet = new CellSkipListSet(c); 120 this.snapshot = new CellSkipListSet(c); 121 timeRangeTracker = new TimeRangeTracker(); 122 snapshotTimeRangeTracker = new TimeRangeTracker(); 123 this.size = new AtomicLong(DEEP_OVERHEAD); 124 this.snapshotSize = 0; 125 if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { 126 String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); 127 this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, 128 new Class[] { Configuration.class }, new Object[] { conf }); 129 } else { 130 this.allocator = null; 131 } 132 } 133 dump()134 void dump() { 135 for (Cell cell: this.cellSet) { 136 LOG.info(cell); 137 } 138 for (Cell cell: this.snapshot) { 139 LOG.info(cell); 140 } 141 } 142 143 /** 144 * Creates a snapshot of the current memstore. 145 * Snapshot must be cleared by call to {@link #clearSnapshot(long)} 146 */ 147 @Override snapshot()148 public MemStoreSnapshot snapshot() { 149 // If snapshot currently has entries, then flusher failed or didn't call 150 // cleanup. Log a warning. 151 if (!this.snapshot.isEmpty()) { 152 LOG.warn("Snapshot called again without clearing previous. " + 153 "Doing nothing. Another ongoing flush or did we fail last attempt?"); 154 } else { 155 this.snapshotId = EnvironmentEdgeManager.currentTime(); 156 this.snapshotSize = keySize(); 157 if (!this.cellSet.isEmpty()) { 158 this.snapshot = this.cellSet; 159 this.cellSet = new CellSkipListSet(this.comparator); 160 this.snapshotTimeRangeTracker = this.timeRangeTracker; 161 this.timeRangeTracker = new TimeRangeTracker(); 162 // Reset heap to not include any keys 163 this.size.set(DEEP_OVERHEAD); 164 this.snapshotAllocator = this.allocator; 165 // Reset allocator so we get a fresh buffer for the new memstore 166 if (allocator != null) { 167 String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); 168 this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, 169 new Class[] { Configuration.class }, new Object[] { conf }); 170 } 171 timeOfOldestEdit = Long.MAX_VALUE; 172 } 173 } 174 MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize, 175 this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator), 176 this.tagsPresent); 177 this.tagsPresent = false; 178 return memStoreSnapshot; 179 } 180 181 /** 182 * The passed snapshot was successfully persisted; it can be let go. 183 * @param id Id of the snapshot to clean out. 184 * @throws UnexpectedStateException 185 * @see #snapshot() 186 */ 187 @Override clearSnapshot(long id)188 public void clearSnapshot(long id) throws UnexpectedStateException { 189 MemStoreLAB tmpAllocator = null; 190 if (this.snapshotId != id) { 191 throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " 192 + id); 193 } 194 // OK. Passed in snapshot is same as current snapshot. If not-empty, 195 // create a new snapshot and let the old one go. 196 if (!this.snapshot.isEmpty()) { 197 this.snapshot = new CellSkipListSet(this.comparator); 198 this.snapshotTimeRangeTracker = new TimeRangeTracker(); 199 } 200 this.snapshotSize = 0; 201 this.snapshotId = -1; 202 if (this.snapshotAllocator != null) { 203 tmpAllocator = this.snapshotAllocator; 204 this.snapshotAllocator = null; 205 } 206 if (tmpAllocator != null) { 207 tmpAllocator.close(); 208 } 209 } 210 211 @Override getFlushableSize()212 public long getFlushableSize() { 213 return this.snapshotSize > 0 ? this.snapshotSize : keySize(); 214 } 215 216 @Override getSnapshotSize()217 public long getSnapshotSize() { 218 return this.snapshotSize; 219 } 220 221 /** 222 * Write an update 223 * @param cell 224 * @return approximate size of the passed cell. 225 */ 226 @Override add(Cell cell)227 public long add(Cell cell) { 228 Cell toAdd = maybeCloneWithAllocator(cell); 229 return internalAdd(toAdd); 230 } 231 232 @Override timeOfOldestEdit()233 public long timeOfOldestEdit() { 234 return timeOfOldestEdit; 235 } 236 addToCellSet(Cell e)237 private boolean addToCellSet(Cell e) { 238 boolean b = this.cellSet.add(e); 239 // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. 240 // When we use ACL CP or Visibility CP which deals with Tags during 241 // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not 242 // parse the byte[] to identify the tags length. 243 if(e.getTagsLength() > 0) { 244 tagsPresent = true; 245 } 246 setOldestEditTimeToNow(); 247 return b; 248 } 249 removeFromCellSet(Cell e)250 private boolean removeFromCellSet(Cell e) { 251 boolean b = this.cellSet.remove(e); 252 setOldestEditTimeToNow(); 253 return b; 254 } 255 setOldestEditTimeToNow()256 void setOldestEditTimeToNow() { 257 if (timeOfOldestEdit == Long.MAX_VALUE) { 258 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 259 } 260 } 261 262 /** 263 * Internal version of add() that doesn't clone Cells with the 264 * allocator, and doesn't take the lock. 265 * 266 * Callers should ensure they already have the read lock taken 267 */ internalAdd(final Cell toAdd)268 private long internalAdd(final Cell toAdd) { 269 long s = heapSizeChange(toAdd, addToCellSet(toAdd)); 270 timeRangeTracker.includeTimestamp(toAdd); 271 this.size.addAndGet(s); 272 return s; 273 } 274 maybeCloneWithAllocator(Cell cell)275 private Cell maybeCloneWithAllocator(Cell cell) { 276 if (allocator == null) { 277 return cell; 278 } 279 280 int len = KeyValueUtil.length(cell); 281 ByteRange alloc = allocator.allocateBytes(len); 282 if (alloc == null) { 283 // The allocation was too large, allocator decided 284 // not to do anything with it. 285 return cell; 286 } 287 assert alloc.getBytes() != null; 288 KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset()); 289 KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); 290 newKv.setSequenceId(cell.getSequenceId()); 291 return newKv; 292 } 293 294 /** 295 * Remove n key from the memstore. Only cells that have the same key and the 296 * same memstoreTS are removed. It is ok to not update timeRangeTracker 297 * in this call. It is possible that we can optimize this method by using 298 * tailMap/iterator, but since this method is called rarely (only for 299 * error recovery), we can leave those optimization for the future. 300 * @param cell 301 */ 302 @Override rollback(Cell cell)303 public void rollback(Cell cell) { 304 // If the key is in the snapshot, delete it. We should not update 305 // this.size, because that tracks the size of only the memstore and 306 // not the snapshot. The flush of this snapshot to disk has not 307 // yet started because Store.flush() waits for all rwcc transactions to 308 // commit before starting the flush to disk. 309 Cell found = this.snapshot.get(cell); 310 if (found != null && found.getSequenceId() == cell.getSequenceId()) { 311 this.snapshot.remove(cell); 312 long sz = heapSizeChange(cell, true); 313 this.snapshotSize -= sz; 314 } 315 // If the key is in the memstore, delete it. Update this.size. 316 found = this.cellSet.get(cell); 317 if (found != null && found.getSequenceId() == cell.getSequenceId()) { 318 removeFromCellSet(cell); 319 long s = heapSizeChange(cell, true); 320 this.size.addAndGet(-s); 321 } 322 } 323 324 /** 325 * Write a delete 326 * @param deleteCell 327 * @return approximate size of the passed key and value. 328 */ 329 @Override delete(Cell deleteCell)330 public long delete(Cell deleteCell) { 331 long s = 0; 332 Cell toAdd = maybeCloneWithAllocator(deleteCell); 333 s += heapSizeChange(toAdd, addToCellSet(toAdd)); 334 timeRangeTracker.includeTimestamp(toAdd); 335 this.size.addAndGet(s); 336 return s; 337 } 338 339 /** 340 * @param cell Find the row that comes after this one. If null, we return the 341 * first. 342 * @return Next row or null if none found. 343 */ getNextRow(final Cell cell)344 Cell getNextRow(final Cell cell) { 345 return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot)); 346 } 347 348 /* 349 * @param a 350 * @param b 351 * @return Return lowest of a or b or null if both a and b are null 352 */ getLowest(final Cell a, final Cell b)353 private Cell getLowest(final Cell a, final Cell b) { 354 if (a == null) { 355 return b; 356 } 357 if (b == null) { 358 return a; 359 } 360 return comparator.compareRows(a, b) <= 0? a: b; 361 } 362 363 /* 364 * @param key Find row that follows this one. If null, return first. 365 * @param map Set to look in for a row beyond <code>row</code>. 366 * @return Next row or null if none found. If one found, will be a new 367 * KeyValue -- can be destroyed by subsequent calls to this method. 368 */ getNextRow(final Cell key, final NavigableSet<Cell> set)369 private Cell getNextRow(final Cell key, 370 final NavigableSet<Cell> set) { 371 Cell result = null; 372 SortedSet<Cell> tail = key == null? set: set.tailSet(key); 373 // Iterate until we fall into the next row; i.e. move off current row 374 for (Cell cell: tail) { 375 if (comparator.compareRows(cell, key) <= 0) 376 continue; 377 // Note: Not suppressing deletes or expired cells. Needs to be handled 378 // by higher up functions. 379 result = cell; 380 break; 381 } 382 return result; 383 } 384 385 /** 386 * @param state column/delete tracking state 387 */ 388 @Override getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state)389 public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) { 390 getRowKeyAtOrBefore(cellSet, state); 391 getRowKeyAtOrBefore(snapshot, state); 392 } 393 394 /* 395 * @param set 396 * @param state Accumulates deletes and candidates. 397 */ getRowKeyAtOrBefore(final NavigableSet<Cell> set, final GetClosestRowBeforeTracker state)398 private void getRowKeyAtOrBefore(final NavigableSet<Cell> set, 399 final GetClosestRowBeforeTracker state) { 400 if (set.isEmpty()) { 401 return; 402 } 403 if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) { 404 // Found nothing in row. Try backing up. 405 getRowKeyBefore(set, state); 406 } 407 } 408 409 /* 410 * Walk forward in a row from <code>firstOnRow</code>. Presumption is that 411 * we have been passed the first possible key on a row. As we walk forward 412 * we accumulate deletes until we hit a candidate on the row at which point 413 * we return. 414 * @param set 415 * @param firstOnRow First possible key on this row. 416 * @param state 417 * @return True if we found a candidate walking this row. 418 */ walkForwardInSingleRow(final SortedSet<Cell> set, final Cell firstOnRow, final GetClosestRowBeforeTracker state)419 private boolean walkForwardInSingleRow(final SortedSet<Cell> set, 420 final Cell firstOnRow, final GetClosestRowBeforeTracker state) { 421 boolean foundCandidate = false; 422 SortedSet<Cell> tail = set.tailSet(firstOnRow); 423 if (tail.isEmpty()) return foundCandidate; 424 for (Iterator<Cell> i = tail.iterator(); i.hasNext();) { 425 Cell kv = i.next(); 426 // Did we go beyond the target row? If so break. 427 if (state.isTooFar(kv, firstOnRow)) break; 428 if (state.isExpired(kv)) { 429 i.remove(); 430 continue; 431 } 432 // If we added something, this row is a contender. break. 433 if (state.handle(kv)) { 434 foundCandidate = true; 435 break; 436 } 437 } 438 return foundCandidate; 439 } 440 441 /* 442 * Walk backwards through the passed set a row at a time until we run out of 443 * set or until we get a candidate. 444 * @param set 445 * @param state 446 */ getRowKeyBefore(NavigableSet<Cell> set, final GetClosestRowBeforeTracker state)447 private void getRowKeyBefore(NavigableSet<Cell> set, 448 final GetClosestRowBeforeTracker state) { 449 Cell firstOnRow = state.getTargetKey(); 450 for (Member p = memberOfPreviousRow(set, state, firstOnRow); 451 p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) { 452 // Make sure we don't fall out of our table. 453 if (!state.isTargetTable(p.cell)) break; 454 // Stop looking if we've exited the better candidate range. 455 if (!state.isBetterCandidate(p.cell)) break; 456 // Make into firstOnRow 457 firstOnRow = new KeyValue(p.cell.getRowArray(), p.cell.getRowOffset(), p.cell.getRowLength(), 458 HConstants.LATEST_TIMESTAMP); 459 // If we find something, break; 460 if (walkForwardInSingleRow(p.set, firstOnRow, state)) break; 461 } 462 } 463 464 /** 465 * Only used by tests. TODO: Remove 466 * 467 * Given the specs of a column, update it, first by inserting a new record, 468 * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS 469 * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying 470 * store will ensure that the insert/delete each are atomic. A scanner/reader will either 471 * get the new value, or the old value and all readers will eventually only see the new 472 * value after the old was removed. 473 * 474 * @param row 475 * @param family 476 * @param qualifier 477 * @param newValue 478 * @param now 479 * @return Timestamp 480 */ 481 @Override updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, long now)482 public long updateColumnValue(byte[] row, 483 byte[] family, 484 byte[] qualifier, 485 long newValue, 486 long now) { 487 Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier); 488 // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. 489 SortedSet<Cell> snSs = snapshot.tailSet(firstCell); 490 if (!snSs.isEmpty()) { 491 Cell snc = snSs.first(); 492 // is there a matching Cell in the snapshot? 493 if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { 494 if (snc.getTimestamp() == now) { 495 // poop, 496 now += 1; 497 } 498 } 499 } 500 501 // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. 502 // But the timestamp should also be max(now, mostRecentTsInMemstore) 503 504 // so we cant add the new Cell w/o knowing what's there already, but we also 505 // want to take this chance to delete some cells. So two loops (sad) 506 507 SortedSet<Cell> ss = cellSet.tailSet(firstCell); 508 for (Cell cell : ss) { 509 // if this isnt the row we are interested in, then bail: 510 if (!CellUtil.matchingColumn(cell, family, qualifier) 511 || !CellUtil.matchingRow(cell, firstCell)) { 512 break; // rows dont match, bail. 513 } 514 515 // if the qualifier matches and it's a put, just RM it out of the cellSet. 516 if (cell.getTypeByte() == KeyValue.Type.Put.getCode() && 517 cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) { 518 now = cell.getTimestamp(); 519 } 520 } 521 522 // create or update (upsert) a new Cell with 523 // 'now' and a 0 memstoreTS == immediately visible 524 List<Cell> cells = new ArrayList<Cell>(1); 525 cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); 526 return upsert(cells, 1L); 527 } 528 529 /** 530 * Update or insert the specified KeyValues. 531 * <p> 532 * For each KeyValue, insert into MemStore. This will atomically upsert the 533 * value for that row/family/qualifier. If a KeyValue did already exist, 534 * it will then be removed. 535 * <p> 536 * Currently the memstoreTS is kept at 0 so as each insert happens, it will 537 * be immediately visible. May want to change this so it is atomic across 538 * all KeyValues. 539 * <p> 540 * This is called under row lock, so Get operations will still see updates 541 * atomically. Scans will only see each KeyValue update as atomic. 542 * 543 * @param cells 544 * @param readpoint readpoint below which we can safely remove duplicate KVs 545 * @return change in memstore size 546 */ 547 @Override upsert(Iterable<Cell> cells, long readpoint)548 public long upsert(Iterable<Cell> cells, long readpoint) { 549 long size = 0; 550 for (Cell cell : cells) { 551 size += upsert(cell, readpoint); 552 } 553 return size; 554 } 555 556 /** 557 * Inserts the specified KeyValue into MemStore and deletes any existing 558 * versions of the same row/family/qualifier as the specified KeyValue. 559 * <p> 560 * First, the specified KeyValue is inserted into the Memstore. 561 * <p> 562 * If there are any existing KeyValues in this MemStore with the same row, 563 * family, and qualifier, they are removed. 564 * <p> 565 * Callers must hold the read lock. 566 * 567 * @param cell 568 * @return change in size of MemStore 569 */ upsert(Cell cell, long readpoint)570 private long upsert(Cell cell, long readpoint) { 571 // Add the Cell to the MemStore 572 // Use the internalAdd method here since we (a) already have a lock 573 // and (b) cannot safely use the MSLAB here without potentially 574 // hitting OOME - see TestMemStore.testUpsertMSLAB for a 575 // test that triggers the pathological case if we don't avoid MSLAB 576 // here. 577 long addedSize = internalAdd(cell); 578 579 // Get the Cells for the row/family/qualifier regardless of timestamp. 580 // For this case we want to clean up any other puts 581 Cell firstCell = KeyValueUtil.createFirstOnRow( 582 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), 583 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), 584 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); 585 SortedSet<Cell> ss = cellSet.tailSet(firstCell); 586 Iterator<Cell> it = ss.iterator(); 587 // versions visible to oldest scanner 588 int versionsVisible = 0; 589 while ( it.hasNext() ) { 590 Cell cur = it.next(); 591 592 if (cell == cur) { 593 // ignore the one just put in 594 continue; 595 } 596 // check that this is the row and column we are interested in, otherwise bail 597 if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { 598 // only remove Puts that concurrent scanners cannot possibly see 599 if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && 600 cur.getSequenceId() <= readpoint) { 601 if (versionsVisible >= 1) { 602 // if we get here we have seen at least one version visible to the oldest scanner, 603 // which means we can prove that no scanner will see this version 604 605 // false means there was a change, so give us the size. 606 long delta = heapSizeChange(cur, true); 607 addedSize -= delta; 608 this.size.addAndGet(-delta); 609 it.remove(); 610 setOldestEditTimeToNow(); 611 } else { 612 versionsVisible++; 613 } 614 } 615 } else { 616 // past the row or column, done 617 break; 618 } 619 } 620 return addedSize; 621 } 622 623 /* 624 * Immutable data structure to hold member found in set and the set it was 625 * found in. Include set because it is carrying context. 626 */ 627 private static class Member { 628 final Cell cell; 629 final NavigableSet<Cell> set; Member(final NavigableSet<Cell> s, final Cell kv)630 Member(final NavigableSet<Cell> s, final Cell kv) { 631 this.cell = kv; 632 this.set = s; 633 } 634 } 635 636 /* 637 * @param set Set to walk back in. Pass a first in row or we'll return 638 * same row (loop). 639 * @param state Utility and context. 640 * @param firstOnRow First item on the row after the one we want to find a 641 * member in. 642 * @return Null or member of row previous to <code>firstOnRow</code> 643 */ memberOfPreviousRow(NavigableSet<Cell> set, final GetClosestRowBeforeTracker state, final Cell firstOnRow)644 private Member memberOfPreviousRow(NavigableSet<Cell> set, 645 final GetClosestRowBeforeTracker state, final Cell firstOnRow) { 646 NavigableSet<Cell> head = set.headSet(firstOnRow, false); 647 if (head.isEmpty()) return null; 648 for (Iterator<Cell> i = head.descendingIterator(); i.hasNext();) { 649 Cell found = i.next(); 650 if (state.isExpired(found)) { 651 i.remove(); 652 continue; 653 } 654 return new Member(head, found); 655 } 656 return null; 657 } 658 659 /** 660 * @return scanner on memstore and snapshot in this order. 661 */ 662 @Override getScanners(long readPt)663 public List<KeyValueScanner> getScanners(long readPt) { 664 return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt)); 665 } 666 667 /** 668 * Check if this memstore may contain the required keys 669 * @param scan scan 670 * @param store holds reference to cf 671 * @param oldestUnexpiredTS 672 * @return False if the key definitely does not exist in this Memstore 673 */ shouldSeek(Scan scan, Store store, long oldestUnexpiredTS)674 public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) { 675 byte[] cf = store.getFamily().getName(); 676 TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); 677 if (timeRange == null) { 678 timeRange = scan.getTimeRange(); 679 } 680 return (timeRangeTracker.includesTimeRange(timeRange) || 681 snapshotTimeRangeTracker.includesTimeRange(timeRange)) 682 && (Math.max(timeRangeTracker.getMaximumTimestamp(), 683 snapshotTimeRangeTracker.getMaximumTimestamp()) >= 684 oldestUnexpiredTS); 685 } 686 687 /* 688 * MemStoreScanner implements the KeyValueScanner. 689 * It lets the caller scan the contents of a memstore -- both current 690 * map and snapshot. 691 * This behaves as if it were a real scanner but does not maintain position. 692 */ 693 protected class MemStoreScanner extends NonLazyKeyValueScanner { 694 // Next row information for either cellSet or snapshot 695 private Cell cellSetNextRow = null; 696 private Cell snapshotNextRow = null; 697 698 // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek) 699 private Cell cellSetItRow = null; 700 private Cell snapshotItRow = null; 701 702 // iterator based scanning. 703 private Iterator<Cell> cellSetIt; 704 private Iterator<Cell> snapshotIt; 705 706 // The cellSet and snapshot at the time of creating this scanner 707 private CellSkipListSet cellSetAtCreation; 708 private CellSkipListSet snapshotAtCreation; 709 710 // the pre-calculated Cell to be returned by peek() or next() 711 private Cell theNext; 712 713 // The allocator and snapshot allocator at the time of creating this scanner 714 volatile MemStoreLAB allocatorAtCreation; 715 volatile MemStoreLAB snapshotAllocatorAtCreation; 716 717 // A flag represents whether could stop skipping Cells for MVCC 718 // if have encountered the next row. Only used for reversed scan 719 private boolean stopSkippingCellsIfNextRow = false; 720 721 private long readPoint; 722 723 /* 724 Some notes... 725 726 So memstorescanner is fixed at creation time. this includes pointers/iterators into 727 existing kvset/snapshot. during a snapshot creation, the kvset is null, and the 728 snapshot is moved. since kvset is null there is no point on reseeking on both, 729 we can save us the trouble. During the snapshot->hfile transition, the memstore 730 scanner is re-created by StoreScanner#updateReaders(). StoreScanner should 731 potentially do something smarter by adjusting the existing memstore scanner. 732 733 But there is a greater problem here, that being once a scanner has progressed 734 during a snapshot scenario, we currently iterate past the kvset then 'finish' up. 735 if a scan lasts a little while, there is a chance for new entries in kvset to 736 become available but we will never see them. This needs to be handled at the 737 StoreScanner level with coordination with MemStoreScanner. 738 739 Currently, this problem is only partly managed: during the small amount of time 740 when the StoreScanner has not yet created a new MemStoreScanner, we will miss 741 the adds to kvset in the MemStoreScanner. 742 */ 743 MemStoreScanner(long readPoint)744 MemStoreScanner(long readPoint) { 745 super(); 746 747 this.readPoint = readPoint; 748 cellSetAtCreation = cellSet; 749 snapshotAtCreation = snapshot; 750 if (allocator != null) { 751 this.allocatorAtCreation = allocator; 752 this.allocatorAtCreation.incScannerCount(); 753 } 754 if (snapshotAllocator != null) { 755 this.snapshotAllocatorAtCreation = snapshotAllocator; 756 this.snapshotAllocatorAtCreation.incScannerCount(); 757 } 758 if (Trace.isTracing() && Trace.currentSpan() != null) { 759 Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); 760 } 761 } 762 763 /** 764 * Lock on 'this' must be held by caller. 765 * @param it 766 * @return Next Cell 767 */ getNext(Iterator<Cell> it)768 private Cell getNext(Iterator<Cell> it) { 769 Cell startCell = theNext; 770 Cell v = null; 771 try { 772 while (it.hasNext()) { 773 v = it.next(); 774 if (v.getSequenceId() <= this.readPoint) { 775 return v; 776 } 777 if (stopSkippingCellsIfNextRow && startCell != null 778 && comparator.compareRows(v, startCell) > 0) { 779 return null; 780 } 781 } 782 783 return null; 784 } finally { 785 if (v != null) { 786 // in all cases, remember the last Cell iterated to 787 if (it == snapshotIt) { 788 snapshotItRow = v; 789 } else { 790 cellSetItRow = v; 791 } 792 } 793 } 794 } 795 796 /** 797 * Set the scanner at the seek key. 798 * Must be called only once: there is no thread safety between the scanner 799 * and the memStore. 800 * @param key seek value 801 * @return false if the key is null or if there is no data 802 */ 803 @Override seek(Cell key)804 public synchronized boolean seek(Cell key) { 805 if (key == null) { 806 close(); 807 return false; 808 } 809 // kvset and snapshot will never be null. 810 // if tailSet can't find anything, SortedSet is empty (not null). 811 cellSetIt = cellSetAtCreation.tailSet(key).iterator(); 812 snapshotIt = snapshotAtCreation.tailSet(key).iterator(); 813 cellSetItRow = null; 814 snapshotItRow = null; 815 816 return seekInSubLists(key); 817 } 818 819 820 /** 821 * (Re)initialize the iterators after a seek or a reseek. 822 */ seekInSubLists(Cell key)823 private synchronized boolean seekInSubLists(Cell key){ 824 cellSetNextRow = getNext(cellSetIt); 825 snapshotNextRow = getNext(snapshotIt); 826 827 // Calculate the next value 828 theNext = getLowest(cellSetNextRow, snapshotNextRow); 829 830 // has data 831 return (theNext != null); 832 } 833 834 835 /** 836 * Move forward on the sub-lists set previously by seek. 837 * @param key seek value (should be non-null) 838 * @return true if there is at least one KV to read, false otherwise 839 */ 840 @Override reseek(Cell key)841 public synchronized boolean reseek(Cell key) { 842 /* 843 See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. 844 This code is executed concurrently with flush and puts, without locks. 845 Two points must be known when working on this code: 846 1) It's not possible to use the 'kvTail' and 'snapshot' 847 variables, as they are modified during a flush. 848 2) The ideal implementation for performance would use the sub skip list 849 implicitly pointed by the iterators 'kvsetIt' and 850 'snapshotIt'. Unfortunately the Java API does not offer a method to 851 get it. So we remember the last keys we iterated to and restore 852 the reseeked set to at least that point. 853 */ 854 cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator(); 855 snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); 856 857 return seekInSubLists(key); 858 } 859 860 861 @Override peek()862 public synchronized Cell peek() { 863 //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); 864 return theNext; 865 } 866 867 @Override next()868 public synchronized Cell next() { 869 if (theNext == null) { 870 return null; 871 } 872 873 final Cell ret = theNext; 874 875 // Advance one of the iterators 876 if (theNext == cellSetNextRow) { 877 cellSetNextRow = getNext(cellSetIt); 878 } else { 879 snapshotNextRow = getNext(snapshotIt); 880 } 881 882 // Calculate the next value 883 theNext = getLowest(cellSetNextRow, snapshotNextRow); 884 885 //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); 886 //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + 887 // getLowest() + " threadpoint=" + readpoint); 888 return ret; 889 } 890 891 /* 892 * Returns the lower of the two key values, or null if they are both null. 893 * This uses comparator.compare() to compare the KeyValue using the memstore 894 * comparator. 895 */ getLowest(Cell first, Cell second)896 private Cell getLowest(Cell first, Cell second) { 897 if (first == null && second == null) { 898 return null; 899 } 900 if (first != null && second != null) { 901 int compare = comparator.compare(first, second); 902 return (compare <= 0 ? first : second); 903 } 904 return (first != null ? first : second); 905 } 906 907 /* 908 * Returns the higher of the two cells, or null if they are both null. 909 * This uses comparator.compare() to compare the Cell using the memstore 910 * comparator. 911 */ getHighest(Cell first, Cell second)912 private Cell getHighest(Cell first, Cell second) { 913 if (first == null && second == null) { 914 return null; 915 } 916 if (first != null && second != null) { 917 int compare = comparator.compare(first, second); 918 return (compare > 0 ? first : second); 919 } 920 return (first != null ? first : second); 921 } 922 close()923 public synchronized void close() { 924 this.cellSetNextRow = null; 925 this.snapshotNextRow = null; 926 927 this.cellSetIt = null; 928 this.snapshotIt = null; 929 930 if (allocatorAtCreation != null) { 931 this.allocatorAtCreation.decScannerCount(); 932 this.allocatorAtCreation = null; 933 } 934 if (snapshotAllocatorAtCreation != null) { 935 this.snapshotAllocatorAtCreation.decScannerCount(); 936 this.snapshotAllocatorAtCreation = null; 937 } 938 939 this.cellSetItRow = null; 940 this.snapshotItRow = null; 941 } 942 943 /** 944 * MemStoreScanner returns max value as sequence id because it will 945 * always have the latest data among all files. 946 */ 947 @Override getSequenceID()948 public long getSequenceID() { 949 return Long.MAX_VALUE; 950 } 951 952 @Override shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS)953 public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { 954 return shouldSeek(scan, store, oldestUnexpiredTS); 955 } 956 957 /** 958 * Seek scanner to the given key first. If it returns false(means 959 * peek()==null) or scanner's peek row is bigger than row of given key, seek 960 * the scanner to the previous row of given key 961 */ 962 @Override backwardSeek(Cell key)963 public synchronized boolean backwardSeek(Cell key) { 964 seek(key); 965 if (peek() == null || comparator.compareRows(peek(), key) > 0) { 966 return seekToPreviousRow(key); 967 } 968 return true; 969 } 970 971 /** 972 * Separately get the KeyValue before the specified key from kvset and 973 * snapshotset, and use the row of higher one as the previous row of 974 * specified key, then seek to the first KeyValue of previous row 975 */ 976 @Override seekToPreviousRow(Cell key)977 public synchronized boolean seekToPreviousRow(Cell key) { 978 Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(), 979 key.getRowLength()); 980 SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow); 981 Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last(); 982 SortedSet<Cell> snapshotHead = snapshotAtCreation 983 .headSet(firstKeyOnRow); 984 Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead 985 .last(); 986 Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow); 987 if (lastCellBeforeRow == null) { 988 theNext = null; 989 return false; 990 } 991 Cell firstKeyOnPreviousRow = KeyValueUtil.createFirstOnRow(lastCellBeforeRow.getRowArray(), 992 lastCellBeforeRow.getRowOffset(), lastCellBeforeRow.getRowLength()); 993 this.stopSkippingCellsIfNextRow = true; 994 seek(firstKeyOnPreviousRow); 995 this.stopSkippingCellsIfNextRow = false; 996 if (peek() == null 997 || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { 998 return seekToPreviousRow(lastCellBeforeRow); 999 } 1000 return true; 1001 } 1002 1003 @Override seekToLastRow()1004 public synchronized boolean seekToLastRow() { 1005 Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation 1006 .last(); 1007 Cell second = snapshotAtCreation.isEmpty() ? null 1008 : snapshotAtCreation.last(); 1009 Cell higherCell = getHighest(first, second); 1010 if (higherCell == null) { 1011 return false; 1012 } 1013 Cell firstCellOnLastRow = KeyValueUtil.createFirstOnRow(higherCell.getRowArray(), 1014 higherCell.getRowOffset(), higherCell.getRowLength()); 1015 if (seek(firstCellOnLastRow)) { 1016 return true; 1017 } else { 1018 return seekToPreviousRow(higherCell); 1019 } 1020 1021 } 1022 } 1023 1024 public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 1025 + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); 1026 1027 public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + 1028 ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + 1029 (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); 1030 1031 /* 1032 * Calculate how the MemStore size has changed. Includes overhead of the 1033 * backing Map. 1034 * @param cell 1035 * @param notpresent True if the cell was NOT present in the set. 1036 * @return Size 1037 */ heapSizeChange(final Cell cell, final boolean notpresent)1038 static long heapSizeChange(final Cell cell, final boolean notpresent) { 1039 return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY 1040 + CellUtil.estimatedHeapSizeOf(cell)) : 0; 1041 } 1042 keySize()1043 private long keySize() { 1044 return heapSize() - DEEP_OVERHEAD; 1045 } 1046 1047 /** 1048 * Get the entire heap usage for this MemStore not including keys in the 1049 * snapshot. 1050 */ 1051 @Override heapSize()1052 public long heapSize() { 1053 return size.get(); 1054 } 1055 1056 @Override size()1057 public long size() { 1058 return heapSize(); 1059 } 1060 1061 /** 1062 * Code to help figure if our approximation of object heap sizes is close 1063 * enough. See hbase-900. Fills memstores then waits so user can heap 1064 * dump and bring up resultant hprof in something like jprofiler which 1065 * allows you get 'deep size' on objects. 1066 * @param args main args 1067 */ main(String [] args)1068 public static void main(String [] args) { 1069 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 1070 LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + 1071 runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion()); 1072 LOG.info("vmInputArguments=" + runtime.getInputArguments()); 1073 DefaultMemStore memstore1 = new DefaultMemStore(); 1074 // TODO: x32 vs x64 1075 long size = 0; 1076 final int count = 10000; 1077 byte [] fam = Bytes.toBytes("col"); 1078 byte [] qf = Bytes.toBytes("umn"); 1079 byte [] empty = new byte[0]; 1080 for (int i = 0; i < count; i++) { 1081 // Give each its own ts 1082 size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); 1083 } 1084 LOG.info("memstore1 estimated size=" + size); 1085 for (int i = 0; i < count; i++) { 1086 size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); 1087 } 1088 LOG.info("memstore1 estimated size (2nd loading of same data)=" + size); 1089 // Make a variably sized memstore. 1090 DefaultMemStore memstore2 = new DefaultMemStore(); 1091 for (int i = 0; i < count; i++) { 1092 size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i])); 1093 } 1094 LOG.info("memstore2 estimated size=" + size); 1095 final int seconds = 30; 1096 LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); 1097 for (int i = 0; i < seconds; i++) { 1098 // Thread.sleep(1000); 1099 } 1100 LOG.info("Exiting."); 1101 } 1102 1103 } 1104