1 /* 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20 package org.apache.hadoop.hbase.regionserver; 21 22 import static org.junit.Assert.*; 23 import static org.mockito.Matchers.any; 24 import static org.mockito.Mockito.spy; 25 import static org.mockito.Mockito.times; 26 import static org.mockito.Mockito.verify; 27 28 import java.io.IOException; 29 import java.lang.ref.SoftReference; 30 import java.security.PrivilegedExceptionAction; 31 import java.util.ArrayList; 32 import java.util.Collection; 33 import java.util.Collections; 34 import java.util.Iterator; 35 import java.util.List; 36 import java.util.NavigableSet; 37 import java.util.concurrent.ConcurrentSkipListSet; 38 import java.util.concurrent.atomic.AtomicBoolean; 39 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 import org.apache.hadoop.conf.Configuration; 43 import org.apache.hadoop.fs.FSDataOutputStream; 44 import org.apache.hadoop.fs.FileStatus; 45 import org.apache.hadoop.fs.FileSystem; 46 import org.apache.hadoop.fs.FilterFileSystem; 47 import org.apache.hadoop.fs.LocalFileSystem; 48 import org.apache.hadoop.fs.Path; 49 import org.apache.hadoop.fs.permission.FsPermission; 50 import org.apache.hadoop.hbase.Cell; 51 import org.apache.hadoop.hbase.CellUtil; 52 import org.apache.hadoop.hbase.HBaseConfiguration; 53 import org.apache.hadoop.hbase.HBaseTestingUtility; 54 import org.apache.hadoop.hbase.HColumnDescriptor; 55 import org.apache.hadoop.hbase.HRegionInfo; 56 import org.apache.hadoop.hbase.HTableDescriptor; 57 import org.apache.hadoop.hbase.KeyValue; 58 import org.apache.hadoop.hbase.KeyValue.KVComparator; 59 import org.apache.hadoop.hbase.KeyValueUtil; 60 import org.apache.hadoop.hbase.testclassification.MediumTests; 61 import org.apache.hadoop.hbase.TableName; 62 import org.apache.hadoop.hbase.client.Get; 63 import org.apache.hadoop.hbase.io.compress.Compression; 64 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 65 import org.apache.hadoop.hbase.io.hfile.CacheConfig; 66 import org.apache.hadoop.hbase.io.hfile.HFile; 67 import org.apache.hadoop.hbase.io.hfile.HFileContext; 68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 69 import org.apache.hadoop.hbase.monitoring.MonitoredTask; 70 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 71 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 72 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; 73 import org.apache.hadoop.hbase.wal.DefaultWALProvider; 74 import org.apache.hadoop.hbase.wal.WALFactory; 75 import org.apache.hadoop.hbase.security.User; 76 import org.apache.hadoop.hbase.util.Bytes; 77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 78 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 79 import org.apache.hadoop.hbase.util.FSUtils; 80 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 81 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 82 import org.apache.hadoop.util.Progressable; 83 import org.junit.After; 84 import org.junit.Assert; 85 import org.junit.Before; 86 import org.junit.Rule; 87 import org.junit.Test; 88 import org.junit.experimental.categories.Category; 89 import org.junit.rules.TestName; 90 import org.mockito.Mockito; 91 92 import com.google.common.collect.Lists; 93 94 /** 95 * Test class for the Store 96 */ 97 @Category(MediumTests.class) 98 public class TestStore { 99 private static final Log LOG = LogFactory.getLog(TestStore.class); 100 @Rule public TestName name = new TestName(); 101 102 HStore store; 103 byte [] table = Bytes.toBytes("table"); 104 byte [] family = Bytes.toBytes("family"); 105 106 byte [] row = Bytes.toBytes("row"); 107 byte [] row2 = Bytes.toBytes("row2"); 108 byte [] qf1 = Bytes.toBytes("qf1"); 109 byte [] qf2 = Bytes.toBytes("qf2"); 110 byte [] qf3 = Bytes.toBytes("qf3"); 111 byte [] qf4 = Bytes.toBytes("qf4"); 112 byte [] qf5 = Bytes.toBytes("qf5"); 113 byte [] qf6 = Bytes.toBytes("qf6"); 114 115 NavigableSet<byte[]> qualifiers = 116 new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); 117 118 List<Cell> expected = new ArrayList<Cell>(); 119 List<Cell> result = new ArrayList<Cell>(); 120 121 long id = System.currentTimeMillis(); 122 Get get = new Get(row); 123 124 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 125 private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 126 127 128 /** 129 * Setup 130 * @throws IOException 131 */ 132 @Before setUp()133 public void setUp() throws IOException { 134 qualifiers.add(qf1); 135 qualifiers.add(qf3); 136 qualifiers.add(qf5); 137 138 Iterator<byte[]> iter = qualifiers.iterator(); 139 while(iter.hasNext()){ 140 byte [] next = iter.next(); 141 expected.add(new KeyValue(row, family, next, 1, (byte[])null)); 142 get.addColumn(family, next); 143 } 144 } 145 init(String methodName)146 private void init(String methodName) throws IOException { 147 init(methodName, TEST_UTIL.getConfiguration()); 148 } 149 init(String methodName, Configuration conf)150 private void init(String methodName, Configuration conf) 151 throws IOException { 152 HColumnDescriptor hcd = new HColumnDescriptor(family); 153 // some of the tests write 4 versions and then flush 154 // (with HBASE-4241, lower versions are collected on flush) 155 hcd.setMaxVersions(4); 156 init(methodName, conf, hcd); 157 } 158 init(String methodName, Configuration conf, HColumnDescriptor hcd)159 private void init(String methodName, Configuration conf, 160 HColumnDescriptor hcd) throws IOException { 161 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); 162 init(methodName, conf, htd, hcd); 163 } 164 165 @SuppressWarnings("deprecation") init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd)166 private Store init(String methodName, Configuration conf, HTableDescriptor htd, 167 HColumnDescriptor hcd) throws IOException { 168 //Setting up a Store 169 Path basedir = new Path(DIR+methodName); 170 Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); 171 final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName)); 172 173 FileSystem fs = FileSystem.get(conf); 174 175 fs.delete(logdir, true); 176 177 if (htd.hasFamily(hcd.getName())) { 178 htd.modifyFamily(hcd); 179 } else { 180 htd.addFamily(hcd); 181 } 182 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 183 final Configuration walConf = new Configuration(conf); 184 FSUtils.setRootDir(walConf, basedir); 185 final WALFactory wals = new WALFactory(walConf, null, methodName); 186 HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf, 187 info, htd, null); 188 189 store = new HStore(region, hcd, conf); 190 return store; 191 } 192 193 /** 194 * Test we do not lose data if we fail a flush and then close. 195 * Part of HBase-10466 196 * @throws Exception 197 */ 198 @Test testFlushSizeAccounting()199 public void testFlushSizeAccounting() throws Exception { 200 LOG.info("Setting up a faulty file system that cannot write in " + 201 this.name.getMethodName()); 202 final Configuration conf = HBaseConfiguration.create(); 203 // Only retry once. 204 conf.setInt("hbase.hstore.flush.retries.number", 1); 205 User user = User.createUserForTesting(conf, this.name.getMethodName(), 206 new String[]{"foo"}); 207 // Inject our faulty LocalFileSystem 208 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 209 user.runAs(new PrivilegedExceptionAction<Object>() { 210 @Override 211 public Object run() throws Exception { 212 // Make sure it worked (above is sensitive to caching details in hadoop core) 213 FileSystem fs = FileSystem.get(conf); 214 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 215 FaultyFileSystem ffs = (FaultyFileSystem)fs; 216 217 // Initialize region 218 init(name.getMethodName(), conf); 219 220 long size = store.memstore.getFlushableSize(); 221 Assert.assertEquals(0, size); 222 LOG.info("Adding some data"); 223 long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 224 size = store.memstore.getFlushableSize(); 225 Assert.assertEquals(kvSize, size); 226 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 227 try { 228 LOG.info("Flushing"); 229 flushStore(store, id++); 230 Assert.fail("Didn't bubble up IOE!"); 231 } catch (IOException ioe) { 232 Assert.assertTrue(ioe.getMessage().contains("Fault injected")); 233 } 234 size = store.memstore.getFlushableSize(); 235 Assert.assertEquals(kvSize, size); 236 store.add(new KeyValue(row, family, qf2, 2, (byte[])null)); 237 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 238 // not yet cleared the snapshot -- the above flush failed. 239 Assert.assertEquals(kvSize, size); 240 ffs.fault.set(false); 241 flushStore(store, id++); 242 size = store.memstore.getFlushableSize(); 243 // Size should be the foreground kv size. 244 Assert.assertEquals(kvSize, size); 245 flushStore(store, id++); 246 size = store.memstore.getFlushableSize(); 247 Assert.assertEquals(0, size); 248 return null; 249 } 250 }); 251 } 252 253 /** 254 * Verify that compression and data block encoding are respected by the 255 * Store.createWriterInTmp() method, used on store flush. 256 */ 257 @Test testCreateWriter()258 public void testCreateWriter() throws Exception { 259 Configuration conf = HBaseConfiguration.create(); 260 FileSystem fs = FileSystem.get(conf); 261 262 HColumnDescriptor hcd = new HColumnDescriptor(family); 263 hcd.setCompressionType(Compression.Algorithm.GZ); 264 hcd.setDataBlockEncoding(DataBlockEncoding.DIFF); 265 init(name.getMethodName(), conf, hcd); 266 267 // Test createWriterInTmp() 268 StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false); 269 Path path = writer.getPath(); 270 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 271 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 272 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 273 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 274 writer.close(); 275 276 // Verify that compression and encoding settings are respected 277 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf); 278 Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); 279 Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 280 reader.close(); 281 } 282 283 @Test testDeleteExpiredStoreFiles()284 public void testDeleteExpiredStoreFiles() throws Exception { 285 testDeleteExpiredStoreFiles(0); 286 testDeleteExpiredStoreFiles(1); 287 } 288 289 /* 290 * @param minVersions the MIN_VERSIONS for the column family 291 */ testDeleteExpiredStoreFiles(int minVersions)292 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 293 int storeFileNum = 4; 294 int ttl = 4; 295 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 296 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 297 298 Configuration conf = HBaseConfiguration.create(); 299 // Enable the expired store file deletion 300 conf.setBoolean("hbase.store.delete.expired.storefile", true); 301 // Set the compaction threshold higher to avoid normal compactions. 302 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 303 304 HColumnDescriptor hcd = new HColumnDescriptor(family); 305 hcd.setMinVersions(minVersions); 306 hcd.setTimeToLive(ttl); 307 init(name.getMethodName() + "-" + minVersions, conf, hcd); 308 309 long storeTtl = this.store.getScanInfo().getTtl(); 310 long sleepTime = storeTtl / storeFileNum; 311 long timeStamp; 312 // There are 4 store files and the max time stamp difference among these 313 // store files will be (this.store.ttl / storeFileNum) 314 for (int i = 1; i <= storeFileNum; i++) { 315 LOG.info("Adding some data for the store file #" + i); 316 timeStamp = EnvironmentEdgeManager.currentTime(); 317 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null)); 318 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null)); 319 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null)); 320 flush(i); 321 edge.incrementTime(sleepTime); 322 } 323 324 // Verify the total number of store files 325 Assert.assertEquals(storeFileNum, this.store.getStorefiles().size()); 326 327 // Each call will find one expired store file and delete it before compaction happens. 328 // There will be no compaction due to threshold above. Last file will not be replaced. 329 for (int i = 1; i <= storeFileNum - 1; i++) { 330 // verify the expired store file. 331 assertNull(this.store.requestCompaction()); 332 Collection<StoreFile> sfs = this.store.getStorefiles(); 333 // Ensure i files are gone. 334 if (minVersions == 0) { 335 assertEquals(storeFileNum - i, sfs.size()); 336 // Ensure only non-expired files remain. 337 for (StoreFile sf : sfs) { 338 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 339 } 340 } else { 341 assertEquals(storeFileNum, sfs.size()); 342 } 343 // Let the next store file expired. 344 edge.incrementTime(sleepTime); 345 } 346 assertNull(this.store.requestCompaction()); 347 348 Collection<StoreFile> sfs = this.store.getStorefiles(); 349 // Assert the last expired file is not removed. 350 if (minVersions == 0) { 351 assertEquals(1, sfs.size()); 352 } 353 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 354 assertTrue(ts < (edge.currentTime() - storeTtl)); 355 356 for (StoreFile sf : sfs) { 357 sf.closeReader(true); 358 } 359 } 360 361 @Test 362 public void testLowestModificationTime() throws Exception { 363 Configuration conf = HBaseConfiguration.create(); 364 FileSystem fs = FileSystem.get(conf); 365 // Initialize region 366 init(name.getMethodName(), conf); 367 368 int storeFileNum = 4; 369 for (int i = 1; i <= storeFileNum; i++) { 370 LOG.info("Adding some data for the store file #"+i); 371 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null)); 372 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null)); 373 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null)); 374 flush(i); 375 } 376 // after flush; check the lowest time stamp 377 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 378 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 379 Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); 380 381 // after compact; check the lowest time stamp 382 store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE); 383 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 384 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 385 Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 386 } 387 388 private static long getLowestTimeStampFromFS(FileSystem fs, 389 final Collection<StoreFile> candidates) throws IOException { 390 long minTs = Long.MAX_VALUE; 391 if (candidates.isEmpty()) { 392 return minTs; 393 } 394 Path[] p = new Path[candidates.size()]; 395 int i = 0; 396 for (StoreFile sf : candidates) { 397 p[i] = sf.getPath(); 398 ++i; 399 } 400 401 FileStatus[] stats = fs.listStatus(p); 402 if (stats == null || stats.length == 0) { 403 return minTs; 404 } 405 for (FileStatus s : stats) { 406 minTs = Math.min(minTs, s.getModificationTime()); 407 } 408 return minTs; 409 } 410 411 ////////////////////////////////////////////////////////////////////////////// 412 // Get tests 413 ////////////////////////////////////////////////////////////////////////////// 414 415 private static final int BLOCKSIZE_SMALL = 8192; 416 /** 417 * Test for hbase-1686. 418 * @throws IOException 419 */ 420 @Test 421 public void testEmptyStoreFile() throws IOException { 422 init(this.name.getMethodName()); 423 // Write a store file. 424 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 425 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); 426 flush(1); 427 // Now put in place an empty store file. Its a little tricky. Have to 428 // do manually with hacked in sequence id. 429 StoreFile f = this.store.getStorefiles().iterator().next(); 430 Path storedir = f.getPath().getParent(); 431 long seqid = f.getMaxSequenceId(); 432 Configuration c = HBaseConfiguration.create(); 433 FileSystem fs = FileSystem.get(c); 434 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 435 StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c), 436 fs) 437 .withOutputDir(storedir) 438 .withFileContext(meta) 439 .build(); 440 w.appendMetadata(seqid + 1, false); 441 w.close(); 442 this.store.close(); 443 // Reopen it... should pick up two files 444 this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c); 445 Assert.assertEquals(2, this.store.getStorefilesCount()); 446 447 result = HBaseTestingUtility.getFromStoreFile(store, 448 get.getRow(), 449 qualifiers); 450 Assert.assertEquals(1, result.size()); 451 } 452 453 /** 454 * Getting data from memstore only 455 * @throws IOException 456 */ 457 @Test 458 public void testGet_FromMemStoreOnly() throws IOException { 459 init(this.name.getMethodName()); 460 461 //Put data in memstore 462 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 463 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); 464 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); 465 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); 466 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); 467 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); 468 469 //Get 470 result = HBaseTestingUtility.getFromStoreFile(store, 471 get.getRow(), qualifiers); 472 473 //Compare 474 assertCheck(); 475 } 476 477 /** 478 * Getting data from files only 479 * @throws IOException 480 */ 481 @Test 482 public void testGet_FromFilesOnly() throws IOException { 483 init(this.name.getMethodName()); 484 485 //Put data in memstore 486 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 487 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); 488 //flush 489 flush(1); 490 491 //Add more data 492 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); 493 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); 494 //flush 495 flush(2); 496 497 //Add more data 498 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); 499 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); 500 //flush 501 flush(3); 502 503 //Get 504 result = HBaseTestingUtility.getFromStoreFile(store, 505 get.getRow(), 506 qualifiers); 507 //this.store.get(get, qualifiers, result); 508 509 //Need to sort the result since multiple files 510 Collections.sort(result, KeyValue.COMPARATOR); 511 512 //Compare 513 assertCheck(); 514 } 515 516 /** 517 * Getting data from memstore and files 518 * @throws IOException 519 */ 520 @Test 521 public void testGet_FromMemStoreAndFiles() throws IOException { 522 init(this.name.getMethodName()); 523 524 //Put data in memstore 525 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 526 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); 527 //flush 528 flush(1); 529 530 //Add more data 531 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); 532 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); 533 //flush 534 flush(2); 535 536 //Add more data 537 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); 538 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); 539 540 //Get 541 result = HBaseTestingUtility.getFromStoreFile(store, 542 get.getRow(), qualifiers); 543 544 //Need to sort the result since multiple files 545 Collections.sort(result, KeyValue.COMPARATOR); 546 547 //Compare 548 assertCheck(); 549 } 550 551 private void flush(int storeFilessize) throws IOException{ 552 this.store.snapshot(); 553 flushStore(store, id++); 554 Assert.assertEquals(storeFilessize, this.store.getStorefiles().size()); 555 Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size()); 556 } 557 558 private void assertCheck() { 559 Assert.assertEquals(expected.size(), result.size()); 560 for(int i=0; i<expected.size(); i++) { 561 Assert.assertEquals(expected.get(i), result.get(i)); 562 } 563 } 564 565 ////////////////////////////////////////////////////////////////////////////// 566 // IncrementColumnValue tests 567 ////////////////////////////////////////////////////////////////////////////// 568 /* 569 * test the internal details of how ICV works, especially during a flush scenario. 570 */ 571 @Test 572 public void testIncrementColumnValue_ICVDuringFlush() 573 throws IOException, InterruptedException { 574 init(this.name.getMethodName()); 575 576 long oldValue = 1L; 577 long newValue = 3L; 578 this.store.add(new KeyValue(row, family, qf1, 579 System.currentTimeMillis(), 580 Bytes.toBytes(oldValue))); 581 582 // snapshot the store. 583 this.store.snapshot(); 584 585 // add other things: 586 this.store.add(new KeyValue(row, family, qf2, 587 System.currentTimeMillis(), 588 Bytes.toBytes(oldValue))); 589 590 // update during the snapshot. 591 long ret = this.store.updateColumnValue(row, family, qf1, newValue); 592 593 // memstore should have grown by some amount. 594 Assert.assertTrue(ret > 0); 595 596 // then flush. 597 flushStore(store, id++); 598 Assert.assertEquals(1, this.store.getStorefiles().size()); 599 // from the one we inserted up there, and a new one 600 Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size()); 601 602 // how many key/values for this row are there? 603 Get get = new Get(row); 604 get.addColumn(family, qf1); 605 get.setMaxVersions(); // all versions. 606 List<Cell> results = new ArrayList<Cell>(); 607 608 results = HBaseTestingUtility.getFromStoreFile(store, get); 609 Assert.assertEquals(2, results.size()); 610 611 long ts1 = results.get(0).getTimestamp(); 612 long ts2 = results.get(1).getTimestamp(); 613 614 Assert.assertTrue(ts1 > ts2); 615 616 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); 617 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); 618 } 619 620 @After 621 public void tearDown() throws Exception { 622 EnvironmentEdgeManagerTestHelper.reset(); 623 } 624 625 @Test 626 public void testICV_negMemstoreSize() throws IOException { 627 init(this.name.getMethodName()); 628 629 long time = 100; 630 ManualEnvironmentEdge ee = new ManualEnvironmentEdge(); 631 ee.setValue(time); 632 EnvironmentEdgeManagerTestHelper.injectEdge(ee); 633 long newValue = 3L; 634 long size = 0; 635 636 637 size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1, 638 System.currentTimeMillis(), Bytes.toBytes(newValue))); 639 size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1, 640 System.currentTimeMillis(), Bytes.toBytes(newValue))); 641 size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1, 642 System.currentTimeMillis(), Bytes.toBytes(newValue))); 643 size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1, 644 System.currentTimeMillis(), Bytes.toBytes(newValue))); 645 size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1, 646 System.currentTimeMillis(), Bytes.toBytes(newValue))); 647 648 649 for ( int i = 0 ; i < 10000 ; ++i) { 650 newValue++; 651 652 long ret = this.store.updateColumnValue(row, family, qf1, newValue); 653 long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue); 654 655 if (ret != 0) System.out.println("ret: " + ret); 656 if (ret2 != 0) System.out.println("ret2: " + ret2); 657 658 Assert.assertTrue("ret: " + ret, ret >= 0); 659 size += ret; 660 Assert.assertTrue("ret2: " + ret2, ret2 >= 0); 661 size += ret2; 662 663 664 if (i % 1000 == 0) 665 ee.setValue(++time); 666 } 667 668 long computedSize=0; 669 for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) { 670 long kvsize = DefaultMemStore.heapSizeChange(cell, true); 671 //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); 672 computedSize += kvsize; 673 } Assert.assertEquals(computedSize, size)674 Assert.assertEquals(computedSize, size); 675 } 676 677 @Test testIncrementColumnValue_SnapshotFlushCombo()678 public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception { 679 ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 680 EnvironmentEdgeManagerTestHelper.injectEdge(mee); 681 init(this.name.getMethodName()); 682 683 long oldValue = 1L; 684 long newValue = 3L; 685 this.store.add(new KeyValue(row, family, qf1, 686 EnvironmentEdgeManager.currentTime(), 687 Bytes.toBytes(oldValue))); 688 689 // snapshot the store. 690 this.store.snapshot(); 691 692 // update during the snapshot, the exact same TS as the Put (lololol) 693 long ret = this.store.updateColumnValue(row, family, qf1, newValue); 694 695 // memstore should have grown by some amount. 696 Assert.assertTrue(ret > 0); 697 698 // then flush. 699 flushStore(store, id++); 700 Assert.assertEquals(1, this.store.getStorefiles().size()); 701 Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size()); 702 703 // now increment again: 704 newValue += 1; 705 this.store.updateColumnValue(row, family, qf1, newValue); 706 707 // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again: 708 newValue += 1; 709 this.store.updateColumnValue(row, family, qf1, newValue); 710 711 // the second TS should be TS=2 or higher., even though 'time=1' right now. 712 713 714 // how many key/values for this row are there? 715 Get get = new Get(row); 716 get.addColumn(family, qf1); 717 get.setMaxVersions(); // all versions. 718 List<Cell> results = new ArrayList<Cell>(); 719 720 results = HBaseTestingUtility.getFromStoreFile(store, get); 721 Assert.assertEquals(2, results.size()); 722 723 long ts1 = results.get(0).getTimestamp(); 724 long ts2 = results.get(1).getTimestamp(); 725 726 Assert.assertTrue(ts1 > ts2); 727 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); 728 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); 729 730 mee.setValue(2); // time goes up slightly 731 newValue += 1; 732 this.store.updateColumnValue(row, family, qf1, newValue); 733 734 results = HBaseTestingUtility.getFromStoreFile(store, get); 735 Assert.assertEquals(2, results.size()); 736 737 ts1 = results.get(0).getTimestamp(); 738 ts2 = results.get(1).getTimestamp(); 739 740 Assert.assertTrue(ts1 > ts2); 741 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); 742 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); 743 } 744 745 @Test testHandleErrorsInFlush()746 public void testHandleErrorsInFlush() throws Exception { 747 LOG.info("Setting up a faulty file system that cannot write"); 748 749 final Configuration conf = HBaseConfiguration.create(); 750 User user = User.createUserForTesting(conf, 751 "testhandleerrorsinflush", new String[]{"foo"}); 752 // Inject our faulty LocalFileSystem 753 conf.setClass("fs.file.impl", FaultyFileSystem.class, 754 FileSystem.class); 755 user.runAs(new PrivilegedExceptionAction<Object>() { 756 @Override 757 public Object run() throws Exception { 758 // Make sure it worked (above is sensitive to caching details in hadoop core) 759 FileSystem fs = FileSystem.get(conf); 760 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 761 762 // Initialize region 763 init(name.getMethodName(), conf); 764 765 LOG.info("Adding some data"); 766 store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 767 store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); 768 store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); 769 770 LOG.info("Before flush, we should have no files"); 771 772 Collection<StoreFileInfo> files = 773 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 774 Assert.assertEquals(0, files != null ? files.size() : 0); 775 776 //flush 777 try { 778 LOG.info("Flushing"); 779 flush(1); 780 Assert.fail("Didn't bubble up IOE!"); 781 } catch (IOException ioe) { 782 Assert.assertTrue(ioe.getMessage().contains("Fault injected")); 783 } 784 785 LOG.info("After failed flush, we should still have no files!"); 786 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 787 Assert.assertEquals(0, files != null ? files.size() : 0); 788 store.getHRegion().getWAL().close(); 789 return null; 790 } 791 }); 792 FileSystem.closeAllForUGI(user.getUGI()); 793 } 794 795 /** 796 * Faulty file system that will fail if you write past its fault position the FIRST TIME 797 * only; thereafter it will succeed. Used by {@link TestHRegion} too. 798 */ 799 static class FaultyFileSystem extends FilterFileSystem { 800 List<SoftReference<FaultyOutputStream>> outStreams = 801 new ArrayList<SoftReference<FaultyOutputStream>>(); 802 private long faultPos = 200; 803 AtomicBoolean fault = new AtomicBoolean(true); 804 FaultyFileSystem()805 public FaultyFileSystem() { 806 super(new LocalFileSystem()); 807 System.err.println("Creating faulty!"); 808 } 809 810 @Override create(Path p)811 public FSDataOutputStream create(Path p) throws IOException { 812 return new FaultyOutputStream(super.create(p), faultPos, fault); 813 } 814 815 @Override create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)816 public FSDataOutputStream create(Path f, FsPermission permission, 817 boolean overwrite, int bufferSize, short replication, long blockSize, 818 Progressable progress) throws IOException { 819 return new FaultyOutputStream(super.create(f, permission, 820 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); 821 } 822 823 @Override createNonRecursive(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)824 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, 825 int bufferSize, short replication, long blockSize, Progressable progress) 826 throws IOException { 827 // Fake it. Call create instead. The default implementation throws an IOE 828 // that this is not supported. 829 return create(f, overwrite, bufferSize, replication, blockSize, progress); 830 } 831 } 832 833 static class FaultyOutputStream extends FSDataOutputStream { 834 volatile long faultPos = Long.MAX_VALUE; 835 private final AtomicBoolean fault; 836 FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)837 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 838 throws IOException { 839 super(out, null); 840 this.faultPos = faultPos; 841 this.fault = fault; 842 } 843 844 @Override write(byte[] buf, int offset, int length)845 public void write(byte[] buf, int offset, int length) throws IOException { 846 System.err.println("faulty stream write at pos " + getPos()); 847 injectFault(); 848 super.write(buf, offset, length); 849 } 850 injectFault()851 private void injectFault() throws IOException { 852 if (this.fault.get() && getPos() >= faultPos) { 853 throw new IOException("Fault injected"); 854 } 855 } 856 } 857 flushStore(HStore store, long id)858 private static void flushStore(HStore store, long id) throws IOException { 859 StoreFlushContext storeFlushCtx = store.createFlushContext(id); 860 storeFlushCtx.prepare(); 861 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 862 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 863 } 864 865 /** 866 * Generate a list of KeyValues for testing based on given parameters 867 * @param timestamps 868 * @param numRows 869 * @param qualifier 870 * @param family 871 * @return 872 */ getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, byte[] family)873 List<Cell> getKeyValueSet(long[] timestamps, int numRows, 874 byte[] qualifier, byte[] family) { 875 List<Cell> kvList = new ArrayList<Cell>(); 876 for (int i=1;i<=numRows;i++) { 877 byte[] b = Bytes.toBytes(i); 878 for (long timestamp: timestamps) { 879 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 880 } 881 } 882 return kvList; 883 } 884 885 /** 886 * Test to ensure correctness when using Stores with multiple timestamps 887 * @throws IOException 888 */ 889 @Test testMultipleTimestamps()890 public void testMultipleTimestamps() throws IOException { 891 int numRows = 1; 892 long[] timestamps1 = new long[] {1,5,10,20}; 893 long[] timestamps2 = new long[] {30,80}; 894 895 init(this.name.getMethodName()); 896 897 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); 898 for (Cell kv : kvList1) { 899 this.store.add(KeyValueUtil.ensureKeyValue(kv)); 900 } 901 902 this.store.snapshot(); 903 flushStore(store, id++); 904 905 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); 906 for(Cell kv : kvList2) { 907 this.store.add(KeyValueUtil.ensureKeyValue(kv)); 908 } 909 910 List<Cell> result; 911 Get get = new Get(Bytes.toBytes(1)); 912 get.addColumn(family,qf1); 913 914 get.setTimeRange(0,15); 915 result = HBaseTestingUtility.getFromStoreFile(store, get); 916 Assert.assertTrue(result.size()>0); 917 918 get.setTimeRange(40,90); 919 result = HBaseTestingUtility.getFromStoreFile(store, get); 920 Assert.assertTrue(result.size()>0); 921 922 get.setTimeRange(10,45); 923 result = HBaseTestingUtility.getFromStoreFile(store, get); 924 Assert.assertTrue(result.size()>0); 925 926 get.setTimeRange(80,145); 927 result = HBaseTestingUtility.getFromStoreFile(store, get); 928 Assert.assertTrue(result.size()>0); 929 930 get.setTimeRange(1,2); 931 result = HBaseTestingUtility.getFromStoreFile(store, get); 932 Assert.assertTrue(result.size()>0); 933 934 get.setTimeRange(90,200); 935 result = HBaseTestingUtility.getFromStoreFile(store, get); 936 Assert.assertTrue(result.size()==0); 937 } 938 939 /** 940 * Test for HBASE-3492 - Test split on empty colfam (no store files). 941 * 942 * @throws IOException When the IO operations fail. 943 */ 944 @Test testSplitWithEmptyColFam()945 public void testSplitWithEmptyColFam() throws IOException { 946 init(this.name.getMethodName()); 947 Assert.assertNull(store.getSplitPoint()); 948 store.getHRegion().forceSplit(null); 949 Assert.assertNull(store.getSplitPoint()); 950 store.getHRegion().clearSplit(); 951 } 952 953 @Test testStoreUsesConfigurationFromHcdAndHtd()954 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 955 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 956 long anyValue = 10; 957 958 // We'll check that it uses correct config and propagates it appropriately by going thru 959 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 960 // a number we pass in is higher than some config value, inside compactionPolicy. 961 Configuration conf = HBaseConfiguration.create(); 962 conf.setLong(CONFIG_KEY, anyValue); 963 init(name.getMethodName() + "-xml", conf); 964 Assert.assertTrue(store.throttleCompaction(anyValue + 1)); 965 Assert.assertFalse(store.throttleCompaction(anyValue)); 966 967 // HTD overrides XML. 968 --anyValue; 969 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); 970 HColumnDescriptor hcd = new HColumnDescriptor(family); 971 htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); 972 init(name.getMethodName() + "-htd", conf, htd, hcd); 973 Assert.assertTrue(store.throttleCompaction(anyValue + 1)); 974 Assert.assertFalse(store.throttleCompaction(anyValue)); 975 976 // HCD overrides them both. 977 --anyValue; 978 hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); 979 init(name.getMethodName() + "-hcd", conf, htd, hcd); 980 Assert.assertTrue(store.throttleCompaction(anyValue + 1)); 981 Assert.assertFalse(store.throttleCompaction(anyValue)); 982 } 983 984 public static class DummyStoreEngine extends DefaultStoreEngine { 985 public static DefaultCompactor lastCreatedCompactor = null; 986 @Override createComponents( Configuration conf, Store store, KVComparator comparator)987 protected void createComponents( 988 Configuration conf, Store store, KVComparator comparator) throws IOException { 989 super.createComponents(conf, store, comparator); 990 lastCreatedCompactor = this.compactor; 991 } 992 } 993 994 @Test testStoreUsesSearchEngineOverride()995 public void testStoreUsesSearchEngineOverride() throws Exception { 996 Configuration conf = HBaseConfiguration.create(); 997 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 998 init(this.name.getMethodName(), conf); 999 Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor, 1000 this.store.storeEngine.getCompactor()); 1001 } 1002 addStoreFile()1003 private void addStoreFile() throws IOException { 1004 StoreFile f = this.store.getStorefiles().iterator().next(); 1005 Path storedir = f.getPath().getParent(); 1006 long seqid = this.store.getMaxSequenceId(); 1007 Configuration c = TEST_UTIL.getConfiguration(); 1008 FileSystem fs = FileSystem.get(c); 1009 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1010 StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c), 1011 fs) 1012 .withOutputDir(storedir) 1013 .withFileContext(fileContext) 1014 .build(); 1015 w.appendMetadata(seqid + 1, false); 1016 w.close(); 1017 LOG.info("Added store file:" + w.getPath()); 1018 } 1019 archiveStoreFile(int index)1020 private void archiveStoreFile(int index) throws IOException { 1021 Collection<StoreFile> files = this.store.getStorefiles(); 1022 StoreFile sf = null; 1023 Iterator<StoreFile> it = files.iterator(); 1024 for (int i = 0; i <= index; i++) { 1025 sf = it.next(); 1026 } 1027 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); 1028 } 1029 1030 @Test testRefreshStoreFiles()1031 public void testRefreshStoreFiles() throws Exception { 1032 init(name.getMethodName()); 1033 1034 assertEquals(0, this.store.getStorefilesCount()); 1035 1036 // add some data, flush 1037 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 1038 flush(1); 1039 assertEquals(1, this.store.getStorefilesCount()); 1040 1041 // add one more file 1042 addStoreFile(); 1043 1044 assertEquals(1, this.store.getStorefilesCount()); 1045 store.refreshStoreFiles(); 1046 assertEquals(2, this.store.getStorefilesCount()); 1047 1048 // add three more files 1049 addStoreFile(); 1050 addStoreFile(); 1051 addStoreFile(); 1052 1053 assertEquals(2, this.store.getStorefilesCount()); 1054 store.refreshStoreFiles(); 1055 assertEquals(5, this.store.getStorefilesCount()); 1056 1057 archiveStoreFile(0); 1058 1059 assertEquals(5, this.store.getStorefilesCount()); 1060 store.refreshStoreFiles(); 1061 assertEquals(4, this.store.getStorefilesCount()); 1062 1063 archiveStoreFile(0); 1064 archiveStoreFile(1); 1065 archiveStoreFile(2); 1066 1067 assertEquals(4, this.store.getStorefilesCount()); 1068 store.refreshStoreFiles(); 1069 assertEquals(1, this.store.getStorefilesCount()); 1070 1071 archiveStoreFile(0); 1072 store.refreshStoreFiles(); 1073 assertEquals(0, this.store.getStorefilesCount()); 1074 } 1075 1076 @SuppressWarnings("unchecked") 1077 @Test testRefreshStoreFilesNotChanged()1078 public void testRefreshStoreFilesNotChanged() throws IOException { 1079 init(name.getMethodName()); 1080 1081 assertEquals(0, this.store.getStorefilesCount()); 1082 1083 // add some data, flush 1084 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); 1085 flush(1); 1086 // add one more file 1087 addStoreFile(); 1088 1089 HStore spiedStore = spy(store); 1090 1091 // call first time after files changed 1092 spiedStore.refreshStoreFiles(); 1093 assertEquals(2, this.store.getStorefilesCount()); 1094 verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class)); 1095 1096 // call second time 1097 spiedStore.refreshStoreFiles(); 1098 1099 //ensure that replaceStoreFiles is not called if files are not refreshed 1100 verify(spiedStore, times(0)).replaceStoreFiles(null, null); 1101 } 1102 } 1103