1 /*
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 
20 package org.apache.hadoop.hbase.regionserver;
21 
22 import static org.junit.Assert.*;
23 import static org.mockito.Matchers.any;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27 
28 import java.io.IOException;
29 import java.lang.ref.SoftReference;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.NavigableSet;
37 import java.util.concurrent.ConcurrentSkipListSet;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.FilterFileSystem;
47 import org.apache.hadoop.fs.LocalFileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.fs.permission.FsPermission;
50 import org.apache.hadoop.hbase.Cell;
51 import org.apache.hadoop.hbase.CellUtil;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseTestingUtility;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.KeyValue;
58 import org.apache.hadoop.hbase.KeyValue.KVComparator;
59 import org.apache.hadoop.hbase.KeyValueUtil;
60 import org.apache.hadoop.hbase.testclassification.MediumTests;
61 import org.apache.hadoop.hbase.TableName;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.io.compress.Compression;
64 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
65 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66 import org.apache.hadoop.hbase.io.hfile.HFile;
67 import org.apache.hadoop.hbase.io.hfile.HFileContext;
68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
70 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
71 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
72 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
73 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
74 import org.apache.hadoop.hbase.wal.WALFactory;
75 import org.apache.hadoop.hbase.security.User;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
79 import org.apache.hadoop.hbase.util.FSUtils;
80 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
81 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
82 import org.apache.hadoop.util.Progressable;
83 import org.junit.After;
84 import org.junit.Assert;
85 import org.junit.Before;
86 import org.junit.Rule;
87 import org.junit.Test;
88 import org.junit.experimental.categories.Category;
89 import org.junit.rules.TestName;
90 import org.mockito.Mockito;
91 
92 import com.google.common.collect.Lists;
93 
94 /**
95  * Test class for the Store
96  */
97 @Category(MediumTests.class)
98 public class TestStore {
99   private static final Log LOG = LogFactory.getLog(TestStore.class);
100   @Rule public TestName name = new TestName();
101 
102   HStore store;
103   byte [] table = Bytes.toBytes("table");
104   byte [] family = Bytes.toBytes("family");
105 
106   byte [] row = Bytes.toBytes("row");
107   byte [] row2 = Bytes.toBytes("row2");
108   byte [] qf1 = Bytes.toBytes("qf1");
109   byte [] qf2 = Bytes.toBytes("qf2");
110   byte [] qf3 = Bytes.toBytes("qf3");
111   byte [] qf4 = Bytes.toBytes("qf4");
112   byte [] qf5 = Bytes.toBytes("qf5");
113   byte [] qf6 = Bytes.toBytes("qf6");
114 
115   NavigableSet<byte[]> qualifiers =
116     new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
117 
118   List<Cell> expected = new ArrayList<Cell>();
119   List<Cell> result = new ArrayList<Cell>();
120 
121   long id = System.currentTimeMillis();
122   Get get = new Get(row);
123 
124   private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
125   private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
126 
127 
128   /**
129    * Setup
130    * @throws IOException
131    */
132   @Before
setUp()133   public void setUp() throws IOException {
134     qualifiers.add(qf1);
135     qualifiers.add(qf3);
136     qualifiers.add(qf5);
137 
138     Iterator<byte[]> iter = qualifiers.iterator();
139     while(iter.hasNext()){
140       byte [] next = iter.next();
141       expected.add(new KeyValue(row, family, next, 1, (byte[])null));
142       get.addColumn(family, next);
143     }
144   }
145 
init(String methodName)146   private void init(String methodName) throws IOException {
147     init(methodName, TEST_UTIL.getConfiguration());
148   }
149 
init(String methodName, Configuration conf)150   private void init(String methodName, Configuration conf)
151   throws IOException {
152     HColumnDescriptor hcd = new HColumnDescriptor(family);
153     // some of the tests write 4 versions and then flush
154     // (with HBASE-4241, lower versions are collected on flush)
155     hcd.setMaxVersions(4);
156     init(methodName, conf, hcd);
157   }
158 
init(String methodName, Configuration conf, HColumnDescriptor hcd)159   private void init(String methodName, Configuration conf,
160       HColumnDescriptor hcd) throws IOException {
161     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
162     init(methodName, conf, htd, hcd);
163   }
164 
165   @SuppressWarnings("deprecation")
init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd)166   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
167       HColumnDescriptor hcd) throws IOException {
168     //Setting up a Store
169     Path basedir = new Path(DIR+methodName);
170     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
171     final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName));
172 
173     FileSystem fs = FileSystem.get(conf);
174 
175     fs.delete(logdir, true);
176 
177     if (htd.hasFamily(hcd.getName())) {
178       htd.modifyFamily(hcd);
179     } else {
180       htd.addFamily(hcd);
181     }
182     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
183     final Configuration walConf = new Configuration(conf);
184     FSUtils.setRootDir(walConf, basedir);
185     final WALFactory wals = new WALFactory(walConf, null, methodName);
186     HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
187         info, htd, null);
188 
189     store = new HStore(region, hcd, conf);
190     return store;
191   }
192 
193   /**
194    * Test we do not lose data if we fail a flush and then close.
195    * Part of HBase-10466
196    * @throws Exception
197    */
198   @Test
testFlushSizeAccounting()199   public void testFlushSizeAccounting() throws Exception {
200     LOG.info("Setting up a faulty file system that cannot write in " +
201       this.name.getMethodName());
202     final Configuration conf = HBaseConfiguration.create();
203     // Only retry once.
204     conf.setInt("hbase.hstore.flush.retries.number", 1);
205     User user = User.createUserForTesting(conf, this.name.getMethodName(),
206       new String[]{"foo"});
207     // Inject our faulty LocalFileSystem
208     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
209     user.runAs(new PrivilegedExceptionAction<Object>() {
210       @Override
211       public Object run() throws Exception {
212         // Make sure it worked (above is sensitive to caching details in hadoop core)
213         FileSystem fs = FileSystem.get(conf);
214         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
215         FaultyFileSystem ffs = (FaultyFileSystem)fs;
216 
217         // Initialize region
218         init(name.getMethodName(), conf);
219 
220         long size = store.memstore.getFlushableSize();
221         Assert.assertEquals(0, size);
222         LOG.info("Adding some data");
223         long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
224         size = store.memstore.getFlushableSize();
225         Assert.assertEquals(kvSize, size);
226         // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
227         try {
228           LOG.info("Flushing");
229           flushStore(store, id++);
230           Assert.fail("Didn't bubble up IOE!");
231         } catch (IOException ioe) {
232           Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
233         }
234         size = store.memstore.getFlushableSize();
235         Assert.assertEquals(kvSize, size);
236         store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
237         // Even though we add a new kv, we expect the flushable size to be 'same' since we have
238         // not yet cleared the snapshot -- the above flush failed.
239         Assert.assertEquals(kvSize, size);
240         ffs.fault.set(false);
241         flushStore(store, id++);
242         size = store.memstore.getFlushableSize();
243         // Size should be the foreground kv size.
244         Assert.assertEquals(kvSize, size);
245         flushStore(store, id++);
246         size = store.memstore.getFlushableSize();
247         Assert.assertEquals(0, size);
248         return null;
249       }
250     });
251   }
252 
253   /**
254    * Verify that compression and data block encoding are respected by the
255    * Store.createWriterInTmp() method, used on store flush.
256    */
257   @Test
testCreateWriter()258   public void testCreateWriter() throws Exception {
259     Configuration conf = HBaseConfiguration.create();
260     FileSystem fs = FileSystem.get(conf);
261 
262     HColumnDescriptor hcd = new HColumnDescriptor(family);
263     hcd.setCompressionType(Compression.Algorithm.GZ);
264     hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
265     init(name.getMethodName(), conf, hcd);
266 
267     // Test createWriterInTmp()
268     StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
269     Path path = writer.getPath();
270     writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
271     writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
272     writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
273     writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
274     writer.close();
275 
276     // Verify that compression and encoding settings are respected
277     HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
278     Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
279     Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
280     reader.close();
281   }
282 
283   @Test
testDeleteExpiredStoreFiles()284   public void testDeleteExpiredStoreFiles() throws Exception {
285     testDeleteExpiredStoreFiles(0);
286     testDeleteExpiredStoreFiles(1);
287   }
288 
289   /*
290    * @param minVersions the MIN_VERSIONS for the column family
291    */
testDeleteExpiredStoreFiles(int minVersions)292   public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
293     int storeFileNum = 4;
294     int ttl = 4;
295     IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
296     EnvironmentEdgeManagerTestHelper.injectEdge(edge);
297 
298     Configuration conf = HBaseConfiguration.create();
299     // Enable the expired store file deletion
300     conf.setBoolean("hbase.store.delete.expired.storefile", true);
301     // Set the compaction threshold higher to avoid normal compactions.
302     conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
303 
304     HColumnDescriptor hcd = new HColumnDescriptor(family);
305     hcd.setMinVersions(minVersions);
306     hcd.setTimeToLive(ttl);
307     init(name.getMethodName() + "-" + minVersions, conf, hcd);
308 
309     long storeTtl = this.store.getScanInfo().getTtl();
310     long sleepTime = storeTtl / storeFileNum;
311     long timeStamp;
312     // There are 4 store files and the max time stamp difference among these
313     // store files will be (this.store.ttl / storeFileNum)
314     for (int i = 1; i <= storeFileNum; i++) {
315       LOG.info("Adding some data for the store file #" + i);
316       timeStamp = EnvironmentEdgeManager.currentTime();
317       this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
318       this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
319       this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
320       flush(i);
321       edge.incrementTime(sleepTime);
322     }
323 
324     // Verify the total number of store files
325     Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
326 
327      // Each call will find one expired store file and delete it before compaction happens.
328      // There will be no compaction due to threshold above. Last file will not be replaced.
329     for (int i = 1; i <= storeFileNum - 1; i++) {
330       // verify the expired store file.
331       assertNull(this.store.requestCompaction());
332       Collection<StoreFile> sfs = this.store.getStorefiles();
333       // Ensure i files are gone.
334       if (minVersions == 0) {
335         assertEquals(storeFileNum - i, sfs.size());
336         // Ensure only non-expired files remain.
337         for (StoreFile sf : sfs) {
338           assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
339         }
340       } else {
341         assertEquals(storeFileNum, sfs.size());
342       }
343       // Let the next store file expired.
344       edge.incrementTime(sleepTime);
345     }
346     assertNull(this.store.requestCompaction());
347 
348     Collection<StoreFile> sfs = this.store.getStorefiles();
349     // Assert the last expired file is not removed.
350     if (minVersions == 0) {
351       assertEquals(1, sfs.size());
352     }
353     long ts = sfs.iterator().next().getReader().getMaxTimestamp();
354     assertTrue(ts < (edge.currentTime() - storeTtl));
355 
356     for (StoreFile sf : sfs) {
357       sf.closeReader(true);
358     }
359   }
360 
361   @Test
362   public void testLowestModificationTime() throws Exception {
363     Configuration conf = HBaseConfiguration.create();
364     FileSystem fs = FileSystem.get(conf);
365     // Initialize region
366     init(name.getMethodName(), conf);
367 
368     int storeFileNum = 4;
369     for (int i = 1; i <= storeFileNum; i++) {
370       LOG.info("Adding some data for the store file #"+i);
371       this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
372       this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
373       this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
374       flush(i);
375     }
376     // after flush; check the lowest time stamp
377     long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
378     long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
379     Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
380 
381     // after compact; check the lowest time stamp
382     store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
383     lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
384     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
385     Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
386   }
387 
388   private static long getLowestTimeStampFromFS(FileSystem fs,
389       final Collection<StoreFile> candidates) throws IOException {
390     long minTs = Long.MAX_VALUE;
391     if (candidates.isEmpty()) {
392       return minTs;
393     }
394     Path[] p = new Path[candidates.size()];
395     int i = 0;
396     for (StoreFile sf : candidates) {
397       p[i] = sf.getPath();
398       ++i;
399     }
400 
401     FileStatus[] stats = fs.listStatus(p);
402     if (stats == null || stats.length == 0) {
403       return minTs;
404     }
405     for (FileStatus s : stats) {
406       minTs = Math.min(minTs, s.getModificationTime());
407     }
408     return minTs;
409   }
410 
411   //////////////////////////////////////////////////////////////////////////////
412   // Get tests
413   //////////////////////////////////////////////////////////////////////////////
414 
415   private static final int BLOCKSIZE_SMALL = 8192;
416   /**
417    * Test for hbase-1686.
418    * @throws IOException
419    */
420   @Test
421   public void testEmptyStoreFile() throws IOException {
422     init(this.name.getMethodName());
423     // Write a store file.
424     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
425     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
426     flush(1);
427     // Now put in place an empty store file.  Its a little tricky.  Have to
428     // do manually with hacked in sequence id.
429     StoreFile f = this.store.getStorefiles().iterator().next();
430     Path storedir = f.getPath().getParent();
431     long seqid = f.getMaxSequenceId();
432     Configuration c = HBaseConfiguration.create();
433     FileSystem fs = FileSystem.get(c);
434     HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
435     StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
436         fs)
437             .withOutputDir(storedir)
438             .withFileContext(meta)
439             .build();
440     w.appendMetadata(seqid + 1, false);
441     w.close();
442     this.store.close();
443     // Reopen it... should pick up two files
444     this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
445     Assert.assertEquals(2, this.store.getStorefilesCount());
446 
447     result = HBaseTestingUtility.getFromStoreFile(store,
448         get.getRow(),
449         qualifiers);
450     Assert.assertEquals(1, result.size());
451   }
452 
453   /**
454    * Getting data from memstore only
455    * @throws IOException
456    */
457   @Test
458   public void testGet_FromMemStoreOnly() throws IOException {
459     init(this.name.getMethodName());
460 
461     //Put data in memstore
462     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
463     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
464     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
465     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
466     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
467     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
468 
469     //Get
470     result = HBaseTestingUtility.getFromStoreFile(store,
471         get.getRow(), qualifiers);
472 
473     //Compare
474     assertCheck();
475   }
476 
477   /**
478    * Getting data from files only
479    * @throws IOException
480    */
481   @Test
482   public void testGet_FromFilesOnly() throws IOException {
483     init(this.name.getMethodName());
484 
485     //Put data in memstore
486     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
487     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
488     //flush
489     flush(1);
490 
491     //Add more data
492     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
493     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
494     //flush
495     flush(2);
496 
497     //Add more data
498     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
499     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
500     //flush
501     flush(3);
502 
503     //Get
504     result = HBaseTestingUtility.getFromStoreFile(store,
505         get.getRow(),
506         qualifiers);
507     //this.store.get(get, qualifiers, result);
508 
509     //Need to sort the result since multiple files
510     Collections.sort(result, KeyValue.COMPARATOR);
511 
512     //Compare
513     assertCheck();
514   }
515 
516   /**
517    * Getting data from memstore and files
518    * @throws IOException
519    */
520   @Test
521   public void testGet_FromMemStoreAndFiles() throws IOException {
522     init(this.name.getMethodName());
523 
524     //Put data in memstore
525     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
526     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
527     //flush
528     flush(1);
529 
530     //Add more data
531     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
532     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
533     //flush
534     flush(2);
535 
536     //Add more data
537     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
538     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
539 
540     //Get
541     result = HBaseTestingUtility.getFromStoreFile(store,
542         get.getRow(), qualifiers);
543 
544     //Need to sort the result since multiple files
545     Collections.sort(result, KeyValue.COMPARATOR);
546 
547     //Compare
548     assertCheck();
549   }
550 
551   private void flush(int storeFilessize) throws IOException{
552     this.store.snapshot();
553     flushStore(store, id++);
554     Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
555     Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
556   }
557 
558   private void assertCheck() {
559     Assert.assertEquals(expected.size(), result.size());
560     for(int i=0; i<expected.size(); i++) {
561       Assert.assertEquals(expected.get(i), result.get(i));
562     }
563   }
564 
565   //////////////////////////////////////////////////////////////////////////////
566   // IncrementColumnValue tests
567   //////////////////////////////////////////////////////////////////////////////
568   /*
569    * test the internal details of how ICV works, especially during a flush scenario.
570    */
571   @Test
572   public void testIncrementColumnValue_ICVDuringFlush()
573       throws IOException, InterruptedException {
574     init(this.name.getMethodName());
575 
576     long oldValue = 1L;
577     long newValue = 3L;
578     this.store.add(new KeyValue(row, family, qf1,
579         System.currentTimeMillis(),
580         Bytes.toBytes(oldValue)));
581 
582     // snapshot the store.
583     this.store.snapshot();
584 
585     // add other things:
586     this.store.add(new KeyValue(row, family, qf2,
587         System.currentTimeMillis(),
588         Bytes.toBytes(oldValue)));
589 
590     // update during the snapshot.
591     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
592 
593     // memstore should have grown by some amount.
594     Assert.assertTrue(ret > 0);
595 
596     // then flush.
597     flushStore(store, id++);
598     Assert.assertEquals(1, this.store.getStorefiles().size());
599     // from the one we inserted up there, and a new one
600     Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
601 
602     // how many key/values for this row are there?
603     Get get = new Get(row);
604     get.addColumn(family, qf1);
605     get.setMaxVersions(); // all versions.
606     List<Cell> results = new ArrayList<Cell>();
607 
608     results = HBaseTestingUtility.getFromStoreFile(store, get);
609     Assert.assertEquals(2, results.size());
610 
611     long ts1 = results.get(0).getTimestamp();
612     long ts2 = results.get(1).getTimestamp();
613 
614     Assert.assertTrue(ts1 > ts2);
615 
616     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
617     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
618   }
619 
620   @After
621   public void tearDown() throws Exception {
622     EnvironmentEdgeManagerTestHelper.reset();
623   }
624 
625   @Test
626   public void testICV_negMemstoreSize()  throws IOException {
627       init(this.name.getMethodName());
628 
629     long time = 100;
630     ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
631     ee.setValue(time);
632     EnvironmentEdgeManagerTestHelper.injectEdge(ee);
633     long newValue = 3L;
634     long size = 0;
635 
636 
637     size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
638         System.currentTimeMillis(), Bytes.toBytes(newValue)));
639     size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
640         System.currentTimeMillis(), Bytes.toBytes(newValue)));
641     size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
642         System.currentTimeMillis(), Bytes.toBytes(newValue)));
643     size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
644         System.currentTimeMillis(), Bytes.toBytes(newValue)));
645     size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
646         System.currentTimeMillis(), Bytes.toBytes(newValue)));
647 
648 
649     for ( int i = 0 ; i < 10000 ; ++i) {
650       newValue++;
651 
652       long ret = this.store.updateColumnValue(row, family, qf1, newValue);
653       long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
654 
655       if (ret != 0) System.out.println("ret: " + ret);
656       if (ret2 != 0) System.out.println("ret2: " + ret2);
657 
658       Assert.assertTrue("ret: " + ret, ret >= 0);
659       size += ret;
660       Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
661       size += ret2;
662 
663 
664       if (i % 1000 == 0)
665         ee.setValue(++time);
666     }
667 
668     long computedSize=0;
669     for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
670       long kvsize = DefaultMemStore.heapSizeChange(cell, true);
671       //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
672       computedSize += kvsize;
673     }
Assert.assertEquals(computedSize, size)674     Assert.assertEquals(computedSize, size);
675   }
676 
677   @Test
testIncrementColumnValue_SnapshotFlushCombo()678   public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
679     ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
680     EnvironmentEdgeManagerTestHelper.injectEdge(mee);
681     init(this.name.getMethodName());
682 
683     long oldValue = 1L;
684     long newValue = 3L;
685     this.store.add(new KeyValue(row, family, qf1,
686         EnvironmentEdgeManager.currentTime(),
687         Bytes.toBytes(oldValue)));
688 
689     // snapshot the store.
690     this.store.snapshot();
691 
692     // update during the snapshot, the exact same TS as the Put (lololol)
693     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
694 
695     // memstore should have grown by some amount.
696     Assert.assertTrue(ret > 0);
697 
698     // then flush.
699     flushStore(store, id++);
700     Assert.assertEquals(1, this.store.getStorefiles().size());
701     Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
702 
703     // now increment again:
704     newValue += 1;
705     this.store.updateColumnValue(row, family, qf1, newValue);
706 
707     // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
708     newValue += 1;
709     this.store.updateColumnValue(row, family, qf1, newValue);
710 
711     // the second TS should be TS=2 or higher., even though 'time=1' right now.
712 
713 
714     // how many key/values for this row are there?
715     Get get = new Get(row);
716     get.addColumn(family, qf1);
717     get.setMaxVersions(); // all versions.
718     List<Cell> results = new ArrayList<Cell>();
719 
720     results = HBaseTestingUtility.getFromStoreFile(store, get);
721     Assert.assertEquals(2, results.size());
722 
723     long ts1 = results.get(0).getTimestamp();
724     long ts2 = results.get(1).getTimestamp();
725 
726     Assert.assertTrue(ts1 > ts2);
727     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
728     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
729 
730     mee.setValue(2); // time goes up slightly
731     newValue += 1;
732     this.store.updateColumnValue(row, family, qf1, newValue);
733 
734     results = HBaseTestingUtility.getFromStoreFile(store, get);
735     Assert.assertEquals(2, results.size());
736 
737     ts1 = results.get(0).getTimestamp();
738     ts2 = results.get(1).getTimestamp();
739 
740     Assert.assertTrue(ts1 > ts2);
741     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
742     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
743   }
744 
745   @Test
testHandleErrorsInFlush()746   public void testHandleErrorsInFlush() throws Exception {
747     LOG.info("Setting up a faulty file system that cannot write");
748 
749     final Configuration conf = HBaseConfiguration.create();
750     User user = User.createUserForTesting(conf,
751         "testhandleerrorsinflush", new String[]{"foo"});
752     // Inject our faulty LocalFileSystem
753     conf.setClass("fs.file.impl", FaultyFileSystem.class,
754         FileSystem.class);
755     user.runAs(new PrivilegedExceptionAction<Object>() {
756       @Override
757       public Object run() throws Exception {
758         // Make sure it worked (above is sensitive to caching details in hadoop core)
759         FileSystem fs = FileSystem.get(conf);
760         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
761 
762         // Initialize region
763         init(name.getMethodName(), conf);
764 
765         LOG.info("Adding some data");
766         store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
767         store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
768         store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
769 
770         LOG.info("Before flush, we should have no files");
771 
772         Collection<StoreFileInfo> files =
773           store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
774         Assert.assertEquals(0, files != null ? files.size() : 0);
775 
776         //flush
777         try {
778           LOG.info("Flushing");
779           flush(1);
780           Assert.fail("Didn't bubble up IOE!");
781         } catch (IOException ioe) {
782           Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
783         }
784 
785         LOG.info("After failed flush, we should still have no files!");
786         files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
787         Assert.assertEquals(0, files != null ? files.size() : 0);
788         store.getHRegion().getWAL().close();
789         return null;
790       }
791     });
792     FileSystem.closeAllForUGI(user.getUGI());
793   }
794 
795   /**
796    * Faulty file system that will fail if you write past its fault position the FIRST TIME
797    * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
798    */
799   static class FaultyFileSystem extends FilterFileSystem {
800     List<SoftReference<FaultyOutputStream>> outStreams =
801       new ArrayList<SoftReference<FaultyOutputStream>>();
802     private long faultPos = 200;
803     AtomicBoolean fault = new AtomicBoolean(true);
804 
FaultyFileSystem()805     public FaultyFileSystem() {
806       super(new LocalFileSystem());
807       System.err.println("Creating faulty!");
808     }
809 
810     @Override
create(Path p)811     public FSDataOutputStream create(Path p) throws IOException {
812       return new FaultyOutputStream(super.create(p), faultPos, fault);
813     }
814 
815     @Override
create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)816     public FSDataOutputStream create(Path f, FsPermission permission,
817         boolean overwrite, int bufferSize, short replication, long blockSize,
818         Progressable progress) throws IOException {
819       return new FaultyOutputStream(super.create(f, permission,
820           overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
821     }
822 
823     @Override
createNonRecursive(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)824     public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
825         int bufferSize, short replication, long blockSize, Progressable progress)
826     throws IOException {
827       // Fake it.  Call create instead.  The default implementation throws an IOE
828       // that this is not supported.
829       return create(f, overwrite, bufferSize, replication, blockSize, progress);
830     }
831   }
832 
833   static class FaultyOutputStream extends FSDataOutputStream {
834     volatile long faultPos = Long.MAX_VALUE;
835     private final AtomicBoolean fault;
836 
FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)837     public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
838     throws IOException {
839       super(out, null);
840       this.faultPos = faultPos;
841       this.fault = fault;
842     }
843 
844     @Override
write(byte[] buf, int offset, int length)845     public void write(byte[] buf, int offset, int length) throws IOException {
846       System.err.println("faulty stream write at pos " + getPos());
847       injectFault();
848       super.write(buf, offset, length);
849     }
850 
injectFault()851     private void injectFault() throws IOException {
852       if (this.fault.get() && getPos() >= faultPos) {
853         throw new IOException("Fault injected");
854       }
855     }
856   }
857 
flushStore(HStore store, long id)858   private static void flushStore(HStore store, long id) throws IOException {
859     StoreFlushContext storeFlushCtx = store.createFlushContext(id);
860     storeFlushCtx.prepare();
861     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
862     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
863   }
864 
865   /**
866    * Generate a list of KeyValues for testing based on given parameters
867    * @param timestamps
868    * @param numRows
869    * @param qualifier
870    * @param family
871    * @return
872    */
getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, byte[] family)873   List<Cell> getKeyValueSet(long[] timestamps, int numRows,
874       byte[] qualifier, byte[] family) {
875     List<Cell> kvList = new ArrayList<Cell>();
876     for (int i=1;i<=numRows;i++) {
877       byte[] b = Bytes.toBytes(i);
878       for (long timestamp: timestamps) {
879         kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
880       }
881     }
882     return kvList;
883   }
884 
885   /**
886    * Test to ensure correctness when using Stores with multiple timestamps
887    * @throws IOException
888    */
889   @Test
testMultipleTimestamps()890   public void testMultipleTimestamps() throws IOException {
891     int numRows = 1;
892     long[] timestamps1 = new long[] {1,5,10,20};
893     long[] timestamps2 = new long[] {30,80};
894 
895     init(this.name.getMethodName());
896 
897     List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
898     for (Cell kv : kvList1) {
899       this.store.add(KeyValueUtil.ensureKeyValue(kv));
900     }
901 
902     this.store.snapshot();
903     flushStore(store, id++);
904 
905     List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
906     for(Cell kv : kvList2) {
907       this.store.add(KeyValueUtil.ensureKeyValue(kv));
908     }
909 
910     List<Cell> result;
911     Get get = new Get(Bytes.toBytes(1));
912     get.addColumn(family,qf1);
913 
914     get.setTimeRange(0,15);
915     result = HBaseTestingUtility.getFromStoreFile(store, get);
916     Assert.assertTrue(result.size()>0);
917 
918     get.setTimeRange(40,90);
919     result = HBaseTestingUtility.getFromStoreFile(store, get);
920     Assert.assertTrue(result.size()>0);
921 
922     get.setTimeRange(10,45);
923     result = HBaseTestingUtility.getFromStoreFile(store, get);
924     Assert.assertTrue(result.size()>0);
925 
926     get.setTimeRange(80,145);
927     result = HBaseTestingUtility.getFromStoreFile(store, get);
928     Assert.assertTrue(result.size()>0);
929 
930     get.setTimeRange(1,2);
931     result = HBaseTestingUtility.getFromStoreFile(store, get);
932     Assert.assertTrue(result.size()>0);
933 
934     get.setTimeRange(90,200);
935     result = HBaseTestingUtility.getFromStoreFile(store, get);
936     Assert.assertTrue(result.size()==0);
937   }
938 
939   /**
940    * Test for HBASE-3492 - Test split on empty colfam (no store files).
941    *
942    * @throws IOException When the IO operations fail.
943    */
944   @Test
testSplitWithEmptyColFam()945   public void testSplitWithEmptyColFam() throws IOException {
946     init(this.name.getMethodName());
947     Assert.assertNull(store.getSplitPoint());
948     store.getHRegion().forceSplit(null);
949     Assert.assertNull(store.getSplitPoint());
950     store.getHRegion().clearSplit();
951   }
952 
953   @Test
testStoreUsesConfigurationFromHcdAndHtd()954   public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
955     final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
956     long anyValue = 10;
957 
958     // We'll check that it uses correct config and propagates it appropriately by going thru
959     // the simplest "real" path I can find - "throttleCompaction", which just checks whether
960     // a number we pass in is higher than some config value, inside compactionPolicy.
961     Configuration conf = HBaseConfiguration.create();
962     conf.setLong(CONFIG_KEY, anyValue);
963     init(name.getMethodName() + "-xml", conf);
964     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
965     Assert.assertFalse(store.throttleCompaction(anyValue));
966 
967     // HTD overrides XML.
968     --anyValue;
969     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
970     HColumnDescriptor hcd = new HColumnDescriptor(family);
971     htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
972     init(name.getMethodName() + "-htd", conf, htd, hcd);
973     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
974     Assert.assertFalse(store.throttleCompaction(anyValue));
975 
976     // HCD overrides them both.
977     --anyValue;
978     hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
979     init(name.getMethodName() + "-hcd", conf, htd, hcd);
980     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
981     Assert.assertFalse(store.throttleCompaction(anyValue));
982   }
983 
984   public static class DummyStoreEngine extends DefaultStoreEngine {
985     public static DefaultCompactor lastCreatedCompactor = null;
986     @Override
createComponents( Configuration conf, Store store, KVComparator comparator)987     protected void createComponents(
988         Configuration conf, Store store, KVComparator comparator) throws IOException {
989       super.createComponents(conf, store, comparator);
990       lastCreatedCompactor = this.compactor;
991     }
992   }
993 
994   @Test
testStoreUsesSearchEngineOverride()995   public void testStoreUsesSearchEngineOverride() throws Exception {
996     Configuration conf = HBaseConfiguration.create();
997     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
998     init(this.name.getMethodName(), conf);
999     Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
1000       this.store.storeEngine.getCompactor());
1001   }
1002 
addStoreFile()1003   private void addStoreFile() throws IOException {
1004     StoreFile f = this.store.getStorefiles().iterator().next();
1005     Path storedir = f.getPath().getParent();
1006     long seqid = this.store.getMaxSequenceId();
1007     Configuration c = TEST_UTIL.getConfiguration();
1008     FileSystem fs = FileSystem.get(c);
1009     HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1010     StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
1011         fs)
1012             .withOutputDir(storedir)
1013             .withFileContext(fileContext)
1014             .build();
1015     w.appendMetadata(seqid + 1, false);
1016     w.close();
1017     LOG.info("Added store file:" + w.getPath());
1018   }
1019 
archiveStoreFile(int index)1020   private void archiveStoreFile(int index) throws IOException {
1021     Collection<StoreFile> files = this.store.getStorefiles();
1022     StoreFile sf = null;
1023     Iterator<StoreFile> it = files.iterator();
1024     for (int i = 0; i <= index; i++) {
1025       sf = it.next();
1026     }
1027     store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
1028   }
1029 
1030   @Test
testRefreshStoreFiles()1031   public void testRefreshStoreFiles() throws Exception {
1032     init(name.getMethodName());
1033 
1034     assertEquals(0, this.store.getStorefilesCount());
1035 
1036     // add some data, flush
1037     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1038     flush(1);
1039     assertEquals(1, this.store.getStorefilesCount());
1040 
1041     // add one more file
1042     addStoreFile();
1043 
1044     assertEquals(1, this.store.getStorefilesCount());
1045     store.refreshStoreFiles();
1046     assertEquals(2, this.store.getStorefilesCount());
1047 
1048     // add three more files
1049     addStoreFile();
1050     addStoreFile();
1051     addStoreFile();
1052 
1053     assertEquals(2, this.store.getStorefilesCount());
1054     store.refreshStoreFiles();
1055     assertEquals(5, this.store.getStorefilesCount());
1056 
1057     archiveStoreFile(0);
1058 
1059     assertEquals(5, this.store.getStorefilesCount());
1060     store.refreshStoreFiles();
1061     assertEquals(4, this.store.getStorefilesCount());
1062 
1063     archiveStoreFile(0);
1064     archiveStoreFile(1);
1065     archiveStoreFile(2);
1066 
1067     assertEquals(4, this.store.getStorefilesCount());
1068     store.refreshStoreFiles();
1069     assertEquals(1, this.store.getStorefilesCount());
1070 
1071     archiveStoreFile(0);
1072     store.refreshStoreFiles();
1073     assertEquals(0, this.store.getStorefilesCount());
1074   }
1075 
1076   @SuppressWarnings("unchecked")
1077   @Test
testRefreshStoreFilesNotChanged()1078   public void testRefreshStoreFilesNotChanged() throws IOException {
1079     init(name.getMethodName());
1080 
1081     assertEquals(0, this.store.getStorefilesCount());
1082 
1083     // add some data, flush
1084     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1085     flush(1);
1086     // add one more file
1087     addStoreFile();
1088 
1089     HStore spiedStore = spy(store);
1090 
1091     // call first time after files changed
1092     spiedStore.refreshStoreFiles();
1093     assertEquals(2, this.store.getStorefilesCount());
1094     verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class));
1095 
1096     // call second time
1097     spiedStore.refreshStoreFiles();
1098 
1099     //ensure that replaceStoreFiles is not called if files are not refreshed
1100     verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1101   }
1102 }
1103