1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.regionserver; 19 20 import static org.junit.Assert.assertEquals; 21 22 import java.io.IOException; 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.Collection; 26 import java.util.List; 27 import java.util.concurrent.ThreadLocalRandom; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.conf.Configuration; 32 import org.apache.hadoop.fs.FileSystem; 33 import org.apache.hadoop.hbase.CategoryBasedTimeout; 34 import org.apache.hadoop.hbase.Cell; 35 import org.apache.hadoop.hbase.HBaseTestingUtility; 36 import org.apache.hadoop.hbase.HConstants; 37 import org.apache.hadoop.hbase.client.Durability; 38 import org.apache.hadoop.hbase.client.Increment; 39 import org.apache.hadoop.hbase.client.Scan; 40 import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide; 41 import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 42 import org.apache.hadoop.hbase.testclassification.MediumTests; 43 import org.apache.hadoop.hbase.util.Bytes; 44 import org.apache.hadoop.hbase.wal.WAL; 45 import org.junit.After; 46 import org.junit.Rule; 47 import org.junit.Test; 48 import org.junit.experimental.categories.Category; 49 import org.junit.rules.TestName; 50 import org.junit.rules.TestRule; 51 52 53 /** 54 * Increments with some concurrency against a region to ensure we get the right answer. 55 * Test is parameterized to run the fast and slow path increments; if fast, 56 * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true. 57 * 58 * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads 59 * doing increments across two column families all on one row and the increments are connected to 60 * prove atomicity on row. 61 */ 62 @Category(MediumTests.class) 63 public class TestRegionIncrement { 64 private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class); 65 @Rule public TestName name = new TestName(); 66 @Rule public final TestRule timeout = 67 CategoryBasedTimeout.builder().withTimeout(this.getClass()). 68 withLookingForStuckThread(true).build(); 69 private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 70 private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment"); 71 private static final int THREAD_COUNT = 10; 72 private static final int INCREMENT_COUNT = 10000; data()73 public static Collection<Object []> data() { 74 return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); 75 } 76 77 @After tearDown()78 public void tearDown() throws Exception { 79 TEST_UTIL.cleanupTestDir(); 80 } 81 getRegion(final Configuration conf, final String tableName)82 private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { 83 WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), 84 TEST_UTIL.getDataTestDir().toString(), conf); 85 return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), 86 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, 87 false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); 88 } 89 closeRegion(final HRegion region)90 private void closeRegion(final HRegion region) throws IOException { 91 region.close(); 92 region.getWAL().close(); 93 } 94 95 /** 96 * Increments a single cell a bunch of times. 97 */ 98 private static class SingleCellIncrementer extends Thread { 99 private final int count; 100 private final HRegion region; 101 private final Increment increment; 102 SingleCellIncrementer(final int i, final int count, final HRegion region, final Increment increment)103 SingleCellIncrementer(final int i, final int count, final HRegion region, 104 final Increment increment) { 105 super("" + i); 106 setDaemon(true); 107 this.count = count; 108 this.region = region; 109 this.increment = increment; 110 } 111 112 @Override run()113 public void run() { 114 for (int i = 0; i < this.count; i++) { 115 try { 116 this.region.increment(this.increment); 117 // LOG.info(getName() + " " + i); 118 } catch (IOException e) { 119 throw new RuntimeException(e); 120 } 121 } 122 } 123 } 124 125 /** 126 * Increments a random row's Cell <code>count</code> times. 127 */ 128 private static class CrossRowCellIncrementer extends Thread { 129 private final int count; 130 private final HRegion region; 131 private final Increment [] increments; 132 CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range)133 CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) { 134 super("" + i); 135 setDaemon(true); 136 this.count = count; 137 this.region = region; 138 this.increments = new Increment[range]; 139 for (int ii = 0; ii < range; ii++) { 140 this.increments[ii] = new Increment(Bytes.toBytes(i)); 141 this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); 142 } 143 } 144 145 @Override run()146 public void run() { 147 for (int i = 0; i < this.count; i++) { 148 try { 149 int index = ThreadLocalRandom.current().nextInt(0, this.increments.length); 150 this.region.increment(this.increments[index]); 151 // LOG.info(getName() + " " + index); 152 } catch (IOException e) { 153 throw new RuntimeException(e); 154 } 155 } 156 } 157 } 158 159 /** 160 * Have each thread update its own Cell. Avoid contention with another thread. 161 * @throws IOException 162 * @throws InterruptedException 163 */ 164 @Test testUnContendedSingleCellIncrement()165 public void testUnContendedSingleCellIncrement() 166 throws IOException, InterruptedException { 167 final HRegion region = getRegion(TEST_UTIL.getConfiguration(), 168 TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); 169 long startTime = System.currentTimeMillis(); 170 try { 171 SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT]; 172 for (int i = 0; i < threads.length; i++) { 173 byte [] rowBytes = Bytes.toBytes(i); 174 Increment increment = new Increment(rowBytes); 175 increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); 176 threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment); 177 } 178 for (int i = 0; i < threads.length; i++) { 179 threads[i].start(); 180 } 181 for (int i = 0; i < threads.length; i++) { 182 threads[i].join(); 183 } 184 RegionScanner regionScanner = region.getScanner(new Scan()); 185 List<Cell> cells = new ArrayList<Cell>(THREAD_COUNT); 186 while(regionScanner.next(cells)) continue; 187 assertEquals(THREAD_COUNT, cells.size()); 188 long total = 0; 189 for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); 190 assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); 191 } finally { 192 closeRegion(region); 193 LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); 194 } 195 } 196 197 /** 198 * Have each thread update its own Cell. Avoid contention with another thread. 199 * @throws IOException 200 * @throws InterruptedException 201 */ 202 @Test testContendedAcrossCellsIncrement()203 public void testContendedAcrossCellsIncrement() 204 throws IOException, InterruptedException { 205 final HRegion region = getRegion(TEST_UTIL.getConfiguration(), 206 TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); 207 long startTime = System.currentTimeMillis(); 208 try { 209 CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT]; 210 for (int i = 0; i < threads.length; i++) { 211 threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT); 212 } 213 for (int i = 0; i < threads.length; i++) { 214 threads[i].start(); 215 } 216 for (int i = 0; i < threads.length; i++) { 217 threads[i].join(); 218 } 219 RegionScanner regionScanner = region.getScanner(new Scan()); 220 List<Cell> cells = new ArrayList<Cell>(100); 221 while(regionScanner.next(cells)) continue; 222 assertEquals(THREAD_COUNT, cells.size()); 223 long total = 0; 224 for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); 225 assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); 226 } finally { 227 closeRegion(region); 228 LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); 229 } 230 } 231 }