1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 
20 package org.apache.hadoop.hbase.regionserver;
21 
22 import java.lang.management.ManagementFactory;
23 import java.lang.management.RuntimeMXBean;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.NavigableSet;
29 import java.util.SortedSet;
30 import java.util.concurrent.atomic.AtomicLong;
31 
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.KeyValueUtil;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.io.TimeRange;
44 import org.apache.hadoop.hbase.util.ByteRange;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.ClassSize;
47 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49 import org.apache.hadoop.hbase.util.ReflectionUtils;
50 import org.apache.htrace.Trace;
51 
52 /**
53  * The MemStore holds in-memory modifications to the Store.  Modifications
54  * are {@link Cell}s.  When asked to flush, current memstore is moved
55  * to snapshot and is cleared.  We continue to serve edits out of new memstore
56  * and backing snapshot until flusher reports in that the flush succeeded. At
57  * this point we let the snapshot go.
58  *  <p>
59  * The MemStore functions should not be called in parallel. Callers should hold
60  *  write and read locks. This is done in {@link HStore}.
61  *  </p>
62  *
63  * TODO: Adjust size of the memstore when we remove items because they have
64  * been deleted.
65  * TODO: With new KVSLS, need to make sure we update HeapSize with difference
66  * in KV size.
67  */
68 @InterfaceAudience.Private
69 public class DefaultMemStore implements MemStore {
70   private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
71   static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
72   private static final boolean USEMSLAB_DEFAULT = true;
73   static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
74 
75   private Configuration conf;
76 
77   // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
78   // better semantics.  The Map will overwrite if passed a key it already had
79   // whereas the Set will not add new Cell if key is same though value might be
80   // different.  Value is not important -- just make sure always same
81   // reference passed.
82   volatile CellSkipListSet cellSet;
83 
84   // Snapshot of memstore.  Made for flusher.
85   volatile CellSkipListSet snapshot;
86 
87   final KeyValue.KVComparator comparator;
88 
89   // Used to track own heapSize
90   final AtomicLong size;
91   private volatile long snapshotSize;
92 
93   // Used to track when to flush
94   volatile long timeOfOldestEdit = Long.MAX_VALUE;
95 
96   TimeRangeTracker timeRangeTracker;
97   TimeRangeTracker snapshotTimeRangeTracker;
98 
99   volatile MemStoreLAB allocator;
100   volatile MemStoreLAB snapshotAllocator;
101   volatile long snapshotId;
102   volatile boolean tagsPresent;
103 
104   /**
105    * Default constructor. Used for tests.
106    */
DefaultMemStore()107   public DefaultMemStore() {
108     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
109   }
110 
111   /**
112    * Constructor.
113    * @param c Comparator
114    */
DefaultMemStore(final Configuration conf, final KeyValue.KVComparator c)115   public DefaultMemStore(final Configuration conf,
116                   final KeyValue.KVComparator c) {
117     this.conf = conf;
118     this.comparator = c;
119     this.cellSet = new CellSkipListSet(c);
120     this.snapshot = new CellSkipListSet(c);
121     timeRangeTracker = new TimeRangeTracker();
122     snapshotTimeRangeTracker = new TimeRangeTracker();
123     this.size = new AtomicLong(DEEP_OVERHEAD);
124     this.snapshotSize = 0;
125     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
126       String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
127       this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
128           new Class[] { Configuration.class }, new Object[] { conf });
129     } else {
130       this.allocator = null;
131     }
132   }
133 
dump()134   void dump() {
135     for (Cell cell: this.cellSet) {
136       LOG.info(cell);
137     }
138     for (Cell cell: this.snapshot) {
139       LOG.info(cell);
140     }
141   }
142 
143   /**
144    * Creates a snapshot of the current memstore.
145    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
146    */
147   @Override
snapshot()148   public MemStoreSnapshot snapshot() {
149     // If snapshot currently has entries, then flusher failed or didn't call
150     // cleanup.  Log a warning.
151     if (!this.snapshot.isEmpty()) {
152       LOG.warn("Snapshot called again without clearing previous. " +
153           "Doing nothing. Another ongoing flush or did we fail last attempt?");
154     } else {
155       this.snapshotId = EnvironmentEdgeManager.currentTime();
156       this.snapshotSize = keySize();
157       if (!this.cellSet.isEmpty()) {
158         this.snapshot = this.cellSet;
159         this.cellSet = new CellSkipListSet(this.comparator);
160         this.snapshotTimeRangeTracker = this.timeRangeTracker;
161         this.timeRangeTracker = new TimeRangeTracker();
162         // Reset heap to not include any keys
163         this.size.set(DEEP_OVERHEAD);
164         this.snapshotAllocator = this.allocator;
165         // Reset allocator so we get a fresh buffer for the new memstore
166         if (allocator != null) {
167           String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
168           this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
169               new Class[] { Configuration.class }, new Object[] { conf });
170         }
171         timeOfOldestEdit = Long.MAX_VALUE;
172       }
173     }
174     MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
175         this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
176         this.tagsPresent);
177     this.tagsPresent = false;
178     return memStoreSnapshot;
179   }
180 
181   /**
182    * The passed snapshot was successfully persisted; it can be let go.
183    * @param id Id of the snapshot to clean out.
184    * @throws UnexpectedStateException
185    * @see #snapshot()
186    */
187   @Override
clearSnapshot(long id)188   public void clearSnapshot(long id) throws UnexpectedStateException {
189     MemStoreLAB tmpAllocator = null;
190     if (this.snapshotId != id) {
191       throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
192           + id);
193     }
194     // OK. Passed in snapshot is same as current snapshot. If not-empty,
195     // create a new snapshot and let the old one go.
196     if (!this.snapshot.isEmpty()) {
197       this.snapshot = new CellSkipListSet(this.comparator);
198       this.snapshotTimeRangeTracker = new TimeRangeTracker();
199     }
200     this.snapshotSize = 0;
201     this.snapshotId = -1;
202     if (this.snapshotAllocator != null) {
203       tmpAllocator = this.snapshotAllocator;
204       this.snapshotAllocator = null;
205     }
206     if (tmpAllocator != null) {
207       tmpAllocator.close();
208     }
209   }
210 
211   @Override
getFlushableSize()212   public long getFlushableSize() {
213     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
214   }
215 
216   @Override
getSnapshotSize()217   public long getSnapshotSize() {
218     return this.snapshotSize;
219   }
220 
221   /**
222    * Write an update
223    * @param cell
224    * @return approximate size of the passed cell.
225    */
226   @Override
add(Cell cell)227   public long add(Cell cell) {
228     Cell toAdd = maybeCloneWithAllocator(cell);
229     return internalAdd(toAdd);
230   }
231 
232   @Override
timeOfOldestEdit()233   public long timeOfOldestEdit() {
234     return timeOfOldestEdit;
235   }
236 
addToCellSet(Cell e)237   private boolean addToCellSet(Cell e) {
238     boolean b = this.cellSet.add(e);
239     // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
240     // When we use ACL CP or Visibility CP which deals with Tags during
241     // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
242     // parse the byte[] to identify the tags length.
243     if(e.getTagsLength() > 0) {
244       tagsPresent = true;
245     }
246     setOldestEditTimeToNow();
247     return b;
248   }
249 
removeFromCellSet(Cell e)250   private boolean removeFromCellSet(Cell e) {
251     boolean b = this.cellSet.remove(e);
252     setOldestEditTimeToNow();
253     return b;
254   }
255 
setOldestEditTimeToNow()256   void setOldestEditTimeToNow() {
257     if (timeOfOldestEdit == Long.MAX_VALUE) {
258       timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
259     }
260   }
261 
262   /**
263    * Internal version of add() that doesn't clone Cells with the
264    * allocator, and doesn't take the lock.
265    *
266    * Callers should ensure they already have the read lock taken
267    */
internalAdd(final Cell toAdd)268   private long internalAdd(final Cell toAdd) {
269     long s = heapSizeChange(toAdd, addToCellSet(toAdd));
270     timeRangeTracker.includeTimestamp(toAdd);
271     this.size.addAndGet(s);
272     return s;
273   }
274 
maybeCloneWithAllocator(Cell cell)275   private Cell maybeCloneWithAllocator(Cell cell) {
276     if (allocator == null) {
277       return cell;
278     }
279 
280     int len = KeyValueUtil.length(cell);
281     ByteRange alloc = allocator.allocateBytes(len);
282     if (alloc == null) {
283       // The allocation was too large, allocator decided
284       // not to do anything with it.
285       return cell;
286     }
287     assert alloc.getBytes() != null;
288     KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
289     KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
290     newKv.setSequenceId(cell.getSequenceId());
291     return newKv;
292   }
293 
294   /**
295    * Remove n key from the memstore. Only cells that have the same key and the
296    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
297    * in this call. It is possible that we can optimize this method by using
298    * tailMap/iterator, but since this method is called rarely (only for
299    * error recovery), we can leave those optimization for the future.
300    * @param cell
301    */
302   @Override
rollback(Cell cell)303   public void rollback(Cell cell) {
304     // If the key is in the snapshot, delete it. We should not update
305     // this.size, because that tracks the size of only the memstore and
306     // not the snapshot. The flush of this snapshot to disk has not
307     // yet started because Store.flush() waits for all rwcc transactions to
308     // commit before starting the flush to disk.
309     Cell found = this.snapshot.get(cell);
310     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
311       this.snapshot.remove(cell);
312       long sz = heapSizeChange(cell, true);
313       this.snapshotSize -= sz;
314     }
315     // If the key is in the memstore, delete it. Update this.size.
316     found = this.cellSet.get(cell);
317     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
318       removeFromCellSet(cell);
319       long s = heapSizeChange(cell, true);
320       this.size.addAndGet(-s);
321     }
322   }
323 
324   /**
325    * Write a delete
326    * @param deleteCell
327    * @return approximate size of the passed key and value.
328    */
329   @Override
delete(Cell deleteCell)330   public long delete(Cell deleteCell) {
331     long s = 0;
332     Cell toAdd = maybeCloneWithAllocator(deleteCell);
333     s += heapSizeChange(toAdd, addToCellSet(toAdd));
334     timeRangeTracker.includeTimestamp(toAdd);
335     this.size.addAndGet(s);
336     return s;
337   }
338 
339   /**
340    * @param cell Find the row that comes after this one.  If null, we return the
341    * first.
342    * @return Next row or null if none found.
343    */
getNextRow(final Cell cell)344   Cell getNextRow(final Cell cell) {
345     return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
346   }
347 
348   /*
349    * @param a
350    * @param b
351    * @return Return lowest of a or b or null if both a and b are null
352    */
getLowest(final Cell a, final Cell b)353   private Cell getLowest(final Cell a, final Cell b) {
354     if (a == null) {
355       return b;
356     }
357     if (b == null) {
358       return a;
359     }
360     return comparator.compareRows(a, b) <= 0? a: b;
361   }
362 
363   /*
364    * @param key Find row that follows this one.  If null, return first.
365    * @param map Set to look in for a row beyond <code>row</code>.
366    * @return Next row or null if none found.  If one found, will be a new
367    * KeyValue -- can be destroyed by subsequent calls to this method.
368    */
getNextRow(final Cell key, final NavigableSet<Cell> set)369   private Cell getNextRow(final Cell key,
370       final NavigableSet<Cell> set) {
371     Cell result = null;
372     SortedSet<Cell> tail = key == null? set: set.tailSet(key);
373     // Iterate until we fall into the next row; i.e. move off current row
374     for (Cell cell: tail) {
375       if (comparator.compareRows(cell, key) <= 0)
376         continue;
377       // Note: Not suppressing deletes or expired cells.  Needs to be handled
378       // by higher up functions.
379       result = cell;
380       break;
381     }
382     return result;
383   }
384 
385   /**
386    * @param state column/delete tracking state
387    */
388   @Override
getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state)389   public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
390     getRowKeyAtOrBefore(cellSet, state);
391     getRowKeyAtOrBefore(snapshot, state);
392   }
393 
394   /*
395    * @param set
396    * @param state Accumulates deletes and candidates.
397    */
getRowKeyAtOrBefore(final NavigableSet<Cell> set, final GetClosestRowBeforeTracker state)398   private void getRowKeyAtOrBefore(final NavigableSet<Cell> set,
399       final GetClosestRowBeforeTracker state) {
400     if (set.isEmpty()) {
401       return;
402     }
403     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
404       // Found nothing in row.  Try backing up.
405       getRowKeyBefore(set, state);
406     }
407   }
408 
409   /*
410    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
411    * we have been passed the first possible key on a row.  As we walk forward
412    * we accumulate deletes until we hit a candidate on the row at which point
413    * we return.
414    * @param set
415    * @param firstOnRow First possible key on this row.
416    * @param state
417    * @return True if we found a candidate walking this row.
418    */
walkForwardInSingleRow(final SortedSet<Cell> set, final Cell firstOnRow, final GetClosestRowBeforeTracker state)419   private boolean walkForwardInSingleRow(final SortedSet<Cell> set,
420       final Cell firstOnRow, final GetClosestRowBeforeTracker state) {
421     boolean foundCandidate = false;
422     SortedSet<Cell> tail = set.tailSet(firstOnRow);
423     if (tail.isEmpty()) return foundCandidate;
424     for (Iterator<Cell> i = tail.iterator(); i.hasNext();) {
425       Cell kv = i.next();
426       // Did we go beyond the target row? If so break.
427       if (state.isTooFar(kv, firstOnRow)) break;
428       if (state.isExpired(kv)) {
429         i.remove();
430         continue;
431       }
432       // If we added something, this row is a contender. break.
433       if (state.handle(kv)) {
434         foundCandidate = true;
435         break;
436       }
437     }
438     return foundCandidate;
439   }
440 
441   /*
442    * Walk backwards through the passed set a row at a time until we run out of
443    * set or until we get a candidate.
444    * @param set
445    * @param state
446    */
getRowKeyBefore(NavigableSet<Cell> set, final GetClosestRowBeforeTracker state)447   private void getRowKeyBefore(NavigableSet<Cell> set,
448       final GetClosestRowBeforeTracker state) {
449     Cell firstOnRow = state.getTargetKey();
450     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
451         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
452       // Make sure we don't fall out of our table.
453       if (!state.isTargetTable(p.cell)) break;
454       // Stop looking if we've exited the better candidate range.
455       if (!state.isBetterCandidate(p.cell)) break;
456       // Make into firstOnRow
457       firstOnRow = new KeyValue(p.cell.getRowArray(), p.cell.getRowOffset(), p.cell.getRowLength(),
458           HConstants.LATEST_TIMESTAMP);
459       // If we find something, break;
460       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
461     }
462   }
463 
464   /**
465    * Only used by tests. TODO: Remove
466    *
467    * Given the specs of a column, update it, first by inserting a new record,
468    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
469    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
470    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
471    * get the new value, or the old value and all readers will eventually only see the new
472    * value after the old was removed.
473    *
474    * @param row
475    * @param family
476    * @param qualifier
477    * @param newValue
478    * @param now
479    * @return  Timestamp
480    */
481   @Override
updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, long now)482   public long updateColumnValue(byte[] row,
483                                 byte[] family,
484                                 byte[] qualifier,
485                                 long newValue,
486                                 long now) {
487     Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
488     // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
489     SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
490     if (!snSs.isEmpty()) {
491       Cell snc = snSs.first();
492       // is there a matching Cell in the snapshot?
493       if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
494         if (snc.getTimestamp() == now) {
495           // poop,
496           now += 1;
497         }
498       }
499     }
500 
501     // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
502     // But the timestamp should also be max(now, mostRecentTsInMemstore)
503 
504     // so we cant add the new Cell w/o knowing what's there already, but we also
505     // want to take this chance to delete some cells. So two loops (sad)
506 
507     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
508     for (Cell cell : ss) {
509       // if this isnt the row we are interested in, then bail:
510       if (!CellUtil.matchingColumn(cell, family, qualifier)
511           || !CellUtil.matchingRow(cell, firstCell)) {
512         break; // rows dont match, bail.
513       }
514 
515       // if the qualifier matches and it's a put, just RM it out of the cellSet.
516       if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
517           cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
518         now = cell.getTimestamp();
519       }
520     }
521 
522     // create or update (upsert) a new Cell with
523     // 'now' and a 0 memstoreTS == immediately visible
524     List<Cell> cells = new ArrayList<Cell>(1);
525     cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
526     return upsert(cells, 1L);
527   }
528 
529   /**
530    * Update or insert the specified KeyValues.
531    * <p>
532    * For each KeyValue, insert into MemStore.  This will atomically upsert the
533    * value for that row/family/qualifier.  If a KeyValue did already exist,
534    * it will then be removed.
535    * <p>
536    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
537    * be immediately visible.  May want to change this so it is atomic across
538    * all KeyValues.
539    * <p>
540    * This is called under row lock, so Get operations will still see updates
541    * atomically.  Scans will only see each KeyValue update as atomic.
542    *
543    * @param cells
544    * @param readpoint readpoint below which we can safely remove duplicate KVs
545    * @return change in memstore size
546    */
547   @Override
upsert(Iterable<Cell> cells, long readpoint)548   public long upsert(Iterable<Cell> cells, long readpoint) {
549     long size = 0;
550     for (Cell cell : cells) {
551       size += upsert(cell, readpoint);
552     }
553     return size;
554   }
555 
556   /**
557    * Inserts the specified KeyValue into MemStore and deletes any existing
558    * versions of the same row/family/qualifier as the specified KeyValue.
559    * <p>
560    * First, the specified KeyValue is inserted into the Memstore.
561    * <p>
562    * If there are any existing KeyValues in this MemStore with the same row,
563    * family, and qualifier, they are removed.
564    * <p>
565    * Callers must hold the read lock.
566    *
567    * @param cell
568    * @return change in size of MemStore
569    */
upsert(Cell cell, long readpoint)570   private long upsert(Cell cell, long readpoint) {
571     // Add the Cell to the MemStore
572     // Use the internalAdd method here since we (a) already have a lock
573     // and (b) cannot safely use the MSLAB here without potentially
574     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
575     // test that triggers the pathological case if we don't avoid MSLAB
576     // here.
577     long addedSize = internalAdd(cell);
578 
579     // Get the Cells for the row/family/qualifier regardless of timestamp.
580     // For this case we want to clean up any other puts
581     Cell firstCell = KeyValueUtil.createFirstOnRow(
582         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
583         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
584         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
585     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
586     Iterator<Cell> it = ss.iterator();
587     // versions visible to oldest scanner
588     int versionsVisible = 0;
589     while ( it.hasNext() ) {
590       Cell cur = it.next();
591 
592       if (cell == cur) {
593         // ignore the one just put in
594         continue;
595       }
596       // check that this is the row and column we are interested in, otherwise bail
597       if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
598         // only remove Puts that concurrent scanners cannot possibly see
599         if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
600             cur.getSequenceId() <= readpoint) {
601           if (versionsVisible >= 1) {
602             // if we get here we have seen at least one version visible to the oldest scanner,
603             // which means we can prove that no scanner will see this version
604 
605             // false means there was a change, so give us the size.
606             long delta = heapSizeChange(cur, true);
607             addedSize -= delta;
608             this.size.addAndGet(-delta);
609             it.remove();
610             setOldestEditTimeToNow();
611           } else {
612             versionsVisible++;
613           }
614         }
615       } else {
616         // past the row or column, done
617         break;
618       }
619     }
620     return addedSize;
621   }
622 
623   /*
624    * Immutable data structure to hold member found in set and the set it was
625    * found in. Include set because it is carrying context.
626    */
627   private static class Member {
628     final Cell cell;
629     final NavigableSet<Cell> set;
Member(final NavigableSet<Cell> s, final Cell kv)630     Member(final NavigableSet<Cell> s, final Cell kv) {
631       this.cell = kv;
632       this.set = s;
633     }
634   }
635 
636   /*
637    * @param set Set to walk back in.  Pass a first in row or we'll return
638    * same row (loop).
639    * @param state Utility and context.
640    * @param firstOnRow First item on the row after the one we want to find a
641    * member in.
642    * @return Null or member of row previous to <code>firstOnRow</code>
643    */
memberOfPreviousRow(NavigableSet<Cell> set, final GetClosestRowBeforeTracker state, final Cell firstOnRow)644   private Member memberOfPreviousRow(NavigableSet<Cell> set,
645       final GetClosestRowBeforeTracker state, final Cell firstOnRow) {
646     NavigableSet<Cell> head = set.headSet(firstOnRow, false);
647     if (head.isEmpty()) return null;
648     for (Iterator<Cell> i = head.descendingIterator(); i.hasNext();) {
649       Cell found = i.next();
650       if (state.isExpired(found)) {
651         i.remove();
652         continue;
653       }
654       return new Member(head, found);
655     }
656     return null;
657   }
658 
659   /**
660    * @return scanner on memstore and snapshot in this order.
661    */
662   @Override
getScanners(long readPt)663   public List<KeyValueScanner> getScanners(long readPt) {
664     return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
665   }
666 
667   /**
668    * Check if this memstore may contain the required keys
669    * @param scan scan
670    * @param store holds reference to cf
671    * @param oldestUnexpiredTS
672    * @return False if the key definitely does not exist in this Memstore
673    */
shouldSeek(Scan scan, Store store, long oldestUnexpiredTS)674   public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
675     byte[] cf = store.getFamily().getName();
676     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
677     if (timeRange == null) {
678       timeRange = scan.getTimeRange();
679     }
680     return (timeRangeTracker.includesTimeRange(timeRange) ||
681         snapshotTimeRangeTracker.includesTimeRange(timeRange))
682         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
683                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
684             oldestUnexpiredTS);
685   }
686 
687   /*
688    * MemStoreScanner implements the KeyValueScanner.
689    * It lets the caller scan the contents of a memstore -- both current
690    * map and snapshot.
691    * This behaves as if it were a real scanner but does not maintain position.
692    */
693   protected class MemStoreScanner extends NonLazyKeyValueScanner {
694     // Next row information for either cellSet or snapshot
695     private Cell cellSetNextRow = null;
696     private Cell snapshotNextRow = null;
697 
698     // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
699     private Cell cellSetItRow = null;
700     private Cell snapshotItRow = null;
701 
702     // iterator based scanning.
703     private Iterator<Cell> cellSetIt;
704     private Iterator<Cell> snapshotIt;
705 
706     // The cellSet and snapshot at the time of creating this scanner
707     private CellSkipListSet cellSetAtCreation;
708     private CellSkipListSet snapshotAtCreation;
709 
710     // the pre-calculated Cell to be returned by peek() or next()
711     private Cell theNext;
712 
713     // The allocator and snapshot allocator at the time of creating this scanner
714     volatile MemStoreLAB allocatorAtCreation;
715     volatile MemStoreLAB snapshotAllocatorAtCreation;
716 
717     // A flag represents whether could stop skipping Cells for MVCC
718     // if have encountered the next row. Only used for reversed scan
719     private boolean stopSkippingCellsIfNextRow = false;
720 
721     private long readPoint;
722 
723     /*
724     Some notes...
725 
726      So memstorescanner is fixed at creation time. this includes pointers/iterators into
727     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
728     snapshot is moved.  since kvset is null there is no point on reseeking on both,
729       we can save us the trouble. During the snapshot->hfile transition, the memstore
730       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
731       potentially do something smarter by adjusting the existing memstore scanner.
732 
733       But there is a greater problem here, that being once a scanner has progressed
734       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
735       if a scan lasts a little while, there is a chance for new entries in kvset to
736       become available but we will never see them.  This needs to be handled at the
737       StoreScanner level with coordination with MemStoreScanner.
738 
739       Currently, this problem is only partly managed: during the small amount of time
740       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
741       the adds to kvset in the MemStoreScanner.
742     */
743 
MemStoreScanner(long readPoint)744     MemStoreScanner(long readPoint) {
745       super();
746 
747       this.readPoint = readPoint;
748       cellSetAtCreation = cellSet;
749       snapshotAtCreation = snapshot;
750       if (allocator != null) {
751         this.allocatorAtCreation = allocator;
752         this.allocatorAtCreation.incScannerCount();
753       }
754       if (snapshotAllocator != null) {
755         this.snapshotAllocatorAtCreation = snapshotAllocator;
756         this.snapshotAllocatorAtCreation.incScannerCount();
757       }
758       if (Trace.isTracing() && Trace.currentSpan() != null) {
759         Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
760       }
761     }
762 
763     /**
764      * Lock on 'this' must be held by caller.
765      * @param it
766      * @return Next Cell
767      */
getNext(Iterator<Cell> it)768     private Cell getNext(Iterator<Cell> it) {
769       Cell startCell = theNext;
770       Cell v = null;
771       try {
772         while (it.hasNext()) {
773           v = it.next();
774           if (v.getSequenceId() <= this.readPoint) {
775             return v;
776           }
777           if (stopSkippingCellsIfNextRow && startCell != null
778               && comparator.compareRows(v, startCell) > 0) {
779             return null;
780           }
781         }
782 
783         return null;
784       } finally {
785         if (v != null) {
786           // in all cases, remember the last Cell iterated to
787           if (it == snapshotIt) {
788             snapshotItRow = v;
789           } else {
790             cellSetItRow = v;
791           }
792         }
793       }
794     }
795 
796     /**
797      *  Set the scanner at the seek key.
798      *  Must be called only once: there is no thread safety between the scanner
799      *   and the memStore.
800      * @param key seek value
801      * @return false if the key is null or if there is no data
802      */
803     @Override
seek(Cell key)804     public synchronized boolean seek(Cell key) {
805       if (key == null) {
806         close();
807         return false;
808       }
809       // kvset and snapshot will never be null.
810       // if tailSet can't find anything, SortedSet is empty (not null).
811       cellSetIt = cellSetAtCreation.tailSet(key).iterator();
812       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
813       cellSetItRow = null;
814       snapshotItRow = null;
815 
816       return seekInSubLists(key);
817     }
818 
819 
820     /**
821      * (Re)initialize the iterators after a seek or a reseek.
822      */
seekInSubLists(Cell key)823     private synchronized boolean seekInSubLists(Cell key){
824       cellSetNextRow = getNext(cellSetIt);
825       snapshotNextRow = getNext(snapshotIt);
826 
827       // Calculate the next value
828       theNext = getLowest(cellSetNextRow, snapshotNextRow);
829 
830       // has data
831       return (theNext != null);
832     }
833 
834 
835     /**
836      * Move forward on the sub-lists set previously by seek.
837      * @param key seek value (should be non-null)
838      * @return true if there is at least one KV to read, false otherwise
839      */
840     @Override
reseek(Cell key)841     public synchronized boolean reseek(Cell key) {
842       /*
843       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
844       This code is executed concurrently with flush and puts, without locks.
845       Two points must be known when working on this code:
846       1) It's not possible to use the 'kvTail' and 'snapshot'
847        variables, as they are modified during a flush.
848       2) The ideal implementation for performance would use the sub skip list
849        implicitly pointed by the iterators 'kvsetIt' and
850        'snapshotIt'. Unfortunately the Java API does not offer a method to
851        get it. So we remember the last keys we iterated to and restore
852        the reseeked set to at least that point.
853        */
854       cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
855       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
856 
857       return seekInSubLists(key);
858     }
859 
860 
861     @Override
peek()862     public synchronized Cell peek() {
863       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
864       return theNext;
865     }
866 
867     @Override
next()868     public synchronized Cell next() {
869       if (theNext == null) {
870           return null;
871       }
872 
873       final Cell ret = theNext;
874 
875       // Advance one of the iterators
876       if (theNext == cellSetNextRow) {
877         cellSetNextRow = getNext(cellSetIt);
878       } else {
879         snapshotNextRow = getNext(snapshotIt);
880       }
881 
882       // Calculate the next value
883       theNext = getLowest(cellSetNextRow, snapshotNextRow);
884 
885       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
886       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
887       //    getLowest() + " threadpoint=" + readpoint);
888       return ret;
889     }
890 
891     /*
892      * Returns the lower of the two key values, or null if they are both null.
893      * This uses comparator.compare() to compare the KeyValue using the memstore
894      * comparator.
895      */
getLowest(Cell first, Cell second)896     private Cell getLowest(Cell first, Cell second) {
897       if (first == null && second == null) {
898         return null;
899       }
900       if (first != null && second != null) {
901         int compare = comparator.compare(first, second);
902         return (compare <= 0 ? first : second);
903       }
904       return (first != null ? first : second);
905     }
906 
907     /*
908      * Returns the higher of the two cells, or null if they are both null.
909      * This uses comparator.compare() to compare the Cell using the memstore
910      * comparator.
911      */
getHighest(Cell first, Cell second)912     private Cell getHighest(Cell first, Cell second) {
913       if (first == null && second == null) {
914         return null;
915       }
916       if (first != null && second != null) {
917         int compare = comparator.compare(first, second);
918         return (compare > 0 ? first : second);
919       }
920       return (first != null ? first : second);
921     }
922 
close()923     public synchronized void close() {
924       this.cellSetNextRow = null;
925       this.snapshotNextRow = null;
926 
927       this.cellSetIt = null;
928       this.snapshotIt = null;
929 
930       if (allocatorAtCreation != null) {
931         this.allocatorAtCreation.decScannerCount();
932         this.allocatorAtCreation = null;
933       }
934       if (snapshotAllocatorAtCreation != null) {
935         this.snapshotAllocatorAtCreation.decScannerCount();
936         this.snapshotAllocatorAtCreation = null;
937       }
938 
939       this.cellSetItRow = null;
940       this.snapshotItRow = null;
941     }
942 
943     /**
944      * MemStoreScanner returns max value as sequence id because it will
945      * always have the latest data among all files.
946      */
947     @Override
getSequenceID()948     public long getSequenceID() {
949       return Long.MAX_VALUE;
950     }
951 
952     @Override
shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS)953     public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
954       return shouldSeek(scan, store, oldestUnexpiredTS);
955     }
956 
957     /**
958      * Seek scanner to the given key first. If it returns false(means
959      * peek()==null) or scanner's peek row is bigger than row of given key, seek
960      * the scanner to the previous row of given key
961      */
962     @Override
backwardSeek(Cell key)963     public synchronized boolean backwardSeek(Cell key) {
964       seek(key);
965       if (peek() == null || comparator.compareRows(peek(), key) > 0) {
966         return seekToPreviousRow(key);
967       }
968       return true;
969     }
970 
971     /**
972      * Separately get the KeyValue before the specified key from kvset and
973      * snapshotset, and use the row of higher one as the previous row of
974      * specified key, then seek to the first KeyValue of previous row
975      */
976     @Override
seekToPreviousRow(Cell key)977     public synchronized boolean seekToPreviousRow(Cell key) {
978       Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
979           key.getRowLength());
980       SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
981       Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
982       SortedSet<Cell> snapshotHead = snapshotAtCreation
983           .headSet(firstKeyOnRow);
984       Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
985           .last();
986       Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
987       if (lastCellBeforeRow == null) {
988         theNext = null;
989         return false;
990       }
991       Cell firstKeyOnPreviousRow = KeyValueUtil.createFirstOnRow(lastCellBeforeRow.getRowArray(),
992           lastCellBeforeRow.getRowOffset(), lastCellBeforeRow.getRowLength());
993       this.stopSkippingCellsIfNextRow = true;
994       seek(firstKeyOnPreviousRow);
995       this.stopSkippingCellsIfNextRow = false;
996       if (peek() == null
997           || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
998         return seekToPreviousRow(lastCellBeforeRow);
999       }
1000       return true;
1001     }
1002 
1003     @Override
seekToLastRow()1004     public synchronized boolean seekToLastRow() {
1005       Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
1006           .last();
1007       Cell second = snapshotAtCreation.isEmpty() ? null
1008           : snapshotAtCreation.last();
1009       Cell higherCell = getHighest(first, second);
1010       if (higherCell == null) {
1011         return false;
1012       }
1013       Cell firstCellOnLastRow = KeyValueUtil.createFirstOnRow(higherCell.getRowArray(),
1014           higherCell.getRowOffset(), higherCell.getRowLength());
1015       if (seek(firstCellOnLastRow)) {
1016         return true;
1017       } else {
1018         return seekToPreviousRow(higherCell);
1019       }
1020 
1021     }
1022   }
1023 
1024   public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
1025       + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
1026 
1027   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1028       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
1029       (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
1030 
1031   /*
1032    * Calculate how the MemStore size has changed.  Includes overhead of the
1033    * backing Map.
1034    * @param cell
1035    * @param notpresent True if the cell was NOT present in the set.
1036    * @return Size
1037    */
heapSizeChange(final Cell cell, final boolean notpresent)1038   static long heapSizeChange(final Cell cell, final boolean notpresent) {
1039     return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
1040         + CellUtil.estimatedHeapSizeOf(cell)) : 0;
1041   }
1042 
keySize()1043   private long keySize() {
1044     return heapSize() - DEEP_OVERHEAD;
1045   }
1046 
1047   /**
1048    * Get the entire heap usage for this MemStore not including keys in the
1049    * snapshot.
1050    */
1051   @Override
heapSize()1052   public long heapSize() {
1053     return size.get();
1054   }
1055 
1056   @Override
size()1057   public long size() {
1058     return heapSize();
1059   }
1060 
1061   /**
1062    * Code to help figure if our approximation of object heap sizes is close
1063    * enough.  See hbase-900.  Fills memstores then waits so user can heap
1064    * dump and bring up resultant hprof in something like jprofiler which
1065    * allows you get 'deep size' on objects.
1066    * @param args main args
1067    */
main(String [] args)1068   public static void main(String [] args) {
1069     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
1070     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
1071       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
1072     LOG.info("vmInputArguments=" + runtime.getInputArguments());
1073     DefaultMemStore memstore1 = new DefaultMemStore();
1074     // TODO: x32 vs x64
1075     long size = 0;
1076     final int count = 10000;
1077     byte [] fam = Bytes.toBytes("col");
1078     byte [] qf = Bytes.toBytes("umn");
1079     byte [] empty = new byte[0];
1080     for (int i = 0; i < count; i++) {
1081       // Give each its own ts
1082       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1083     }
1084     LOG.info("memstore1 estimated size=" + size);
1085     for (int i = 0; i < count; i++) {
1086       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1087     }
1088     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
1089     // Make a variably sized memstore.
1090     DefaultMemStore memstore2 = new DefaultMemStore();
1091     for (int i = 0; i < count; i++) {
1092       size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]));
1093     }
1094     LOG.info("memstore2 estimated size=" + size);
1095     final int seconds = 30;
1096     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
1097     for (int i = 0; i < seconds; i++) {
1098       // Thread.sleep(1000);
1099     }
1100     LOG.info("Exiting.");
1101   }
1102 
1103 }
1104