1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hbase.regionserver;
19 
20 import static org.junit.Assert.assertEquals;
21 
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.ThreadLocalRandom;
28 
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.hbase.CategoryBasedTimeout;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.client.Durability;
38 import org.apache.hadoop.hbase.client.Increment;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
41 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
42 import org.apache.hadoop.hbase.testclassification.MediumTests;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.wal.WAL;
45 import org.junit.After;
46 import org.junit.Rule;
47 import org.junit.Test;
48 import org.junit.experimental.categories.Category;
49 import org.junit.rules.TestName;
50 import org.junit.rules.TestRule;
51 
52 
53 /**
54  * Increments with some concurrency against a region to ensure we get the right answer.
55  * Test is parameterized to run the fast and slow path increments; if fast,
56  * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
57  *
58  * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads
59  * doing increments across two column families all on one row and the increments are connected to
60  * prove atomicity on row.
61  */
62 @Category(MediumTests.class)
63 public class TestRegionIncrement {
64   private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class);
65   @Rule public TestName name = new TestName();
66   @Rule public final TestRule timeout =
67       CategoryBasedTimeout.builder().withTimeout(this.getClass()).
68         withLookingForStuckThread(true).build();
69   private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
70   private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment");
71   private static final int THREAD_COUNT = 10;
72   private static final int INCREMENT_COUNT = 10000;
data()73   public static Collection<Object []> data() {
74     return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE});
75   }
76 
77   @After
tearDown()78   public void tearDown() throws Exception {
79     TEST_UTIL.cleanupTestDir();
80   }
81 
getRegion(final Configuration conf, final String tableName)82   private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
83     WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
84       TEST_UTIL.getDataTestDir().toString(), conf);
85     return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
86       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,
87       false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
88   }
89 
closeRegion(final HRegion region)90   private void closeRegion(final HRegion region) throws IOException {
91     region.close();
92     region.getWAL().close();
93   }
94 
95   /**
96    * Increments a single cell a bunch of times.
97    */
98   private static class SingleCellIncrementer extends Thread {
99     private final int count;
100     private final HRegion region;
101     private final Increment increment;
102 
SingleCellIncrementer(final int i, final int count, final HRegion region, final Increment increment)103     SingleCellIncrementer(final int i, final int count, final HRegion region,
104         final Increment increment) {
105       super("" + i);
106       setDaemon(true);
107       this.count = count;
108       this.region = region;
109       this.increment = increment;
110     }
111 
112     @Override
run()113     public void run() {
114       for (int i = 0; i < this.count; i++) {
115         try {
116           this.region.increment(this.increment);
117           // LOG.info(getName() + " " + i);
118         } catch (IOException e) {
119           throw new RuntimeException(e);
120         }
121       }
122     }
123   }
124 
125   /**
126    * Increments a random row's Cell <code>count</code> times.
127    */
128   private static class CrossRowCellIncrementer extends Thread {
129     private final int count;
130     private final HRegion region;
131     private final Increment [] increments;
132 
CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range)133     CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
134       super("" + i);
135       setDaemon(true);
136       this.count = count;
137       this.region = region;
138       this.increments = new Increment[range];
139       for (int ii = 0; ii < range; ii++) {
140         this.increments[ii] = new Increment(Bytes.toBytes(i));
141         this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
142       }
143     }
144 
145     @Override
run()146     public void run() {
147       for (int i = 0; i < this.count; i++) {
148         try {
149           int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
150           this.region.increment(this.increments[index]);
151           // LOG.info(getName() + " " + index);
152         } catch (IOException e) {
153           throw new RuntimeException(e);
154         }
155       }
156     }
157   }
158 
159   /**
160    * Have each thread update its own Cell. Avoid contention with another thread.
161    * @throws IOException
162    * @throws InterruptedException
163    */
164   @Test
testUnContendedSingleCellIncrement()165   public void testUnContendedSingleCellIncrement()
166   throws IOException, InterruptedException {
167     final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
168         TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
169     long startTime = System.currentTimeMillis();
170     try {
171       SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT];
172       for (int i = 0; i < threads.length; i++) {
173         byte [] rowBytes = Bytes.toBytes(i);
174         Increment increment = new Increment(rowBytes);
175         increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
176         threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
177       }
178       for (int i = 0; i < threads.length; i++) {
179         threads[i].start();
180       }
181       for (int i = 0; i < threads.length; i++) {
182         threads[i].join();
183       }
184       RegionScanner regionScanner = region.getScanner(new Scan());
185       List<Cell> cells = new ArrayList<Cell>(THREAD_COUNT);
186       while(regionScanner.next(cells)) continue;
187       assertEquals(THREAD_COUNT, cells.size());
188       long total = 0;
189       for (Cell cell: cells) total += Bytes.toLong(cell.getValue());
190       assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
191     } finally {
192       closeRegion(region);
193       LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
194     }
195   }
196 
197   /**
198    * Have each thread update its own Cell. Avoid contention with another thread.
199    * @throws IOException
200    * @throws InterruptedException
201    */
202   @Test
testContendedAcrossCellsIncrement()203   public void testContendedAcrossCellsIncrement()
204   throws IOException, InterruptedException {
205     final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
206         TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
207     long startTime = System.currentTimeMillis();
208     try {
209       CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT];
210       for (int i = 0; i < threads.length; i++) {
211         threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
212       }
213       for (int i = 0; i < threads.length; i++) {
214         threads[i].start();
215       }
216       for (int i = 0; i < threads.length; i++) {
217         threads[i].join();
218       }
219       RegionScanner regionScanner = region.getScanner(new Scan());
220       List<Cell> cells = new ArrayList<Cell>(100);
221       while(regionScanner.next(cells)) continue;
222       assertEquals(THREAD_COUNT, cells.size());
223       long total = 0;
224       for (Cell cell: cells) total += Bytes.toLong(cell.getValue());
225       assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
226     } finally {
227       closeRegion(region);
228       LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
229     }
230   }
231 }