1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hbase.regionserver.wal;
19 
20 import static org.junit.Assert.assertFalse;
21 
22 import java.io.IOException;
23 
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.HTableDescriptor;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
35 import org.apache.hadoop.hbase.testclassification.SmallTests;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.FSTableDescriptors;
38 import org.apache.hadoop.hbase.util.FSUtils;
39 import org.apache.hadoop.hbase.wal.WAL;
40 import org.apache.hadoop.hbase.wal.WALFactory;
41 import org.apache.hadoop.hbase.wal.WALKey;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 
45 /**
46  * Test many concurrent appenders to an WAL while rolling the log.
47  */
48 @Category(SmallTests.class)
49 public class TestLogRollingNoCluster {
50   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
51   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
52   private static final int THREAD_COUNT = 100; // Spin up this many threads
53 
54   /**
55    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
56    * WAL frequently to try and trigger NPE.
57    * @throws IOException
58    * @throws InterruptedException
59    */
60   @Test
testContendedLogRolling()61   public void testContendedLogRolling() throws IOException, InterruptedException {
62     Path dir = TEST_UTIL.getDataTestDir();
63     // The implementation needs to know the 'handler' count.
64     TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
65     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
66     FSUtils.setRootDir(conf, dir);
67     final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
68     final WAL wal = wals.getWAL(new byte[]{});
69 
70     Appender [] appenders = null;
71 
72     final int count = THREAD_COUNT;
73     appenders = new Appender[count];
74     try {
75       for (int i = 0; i < count; i++) {
76         // Have each appending thread write 'count' entries
77         appenders[i] = new Appender(wal, i, count);
78       }
79       for (int i = 0; i < count; i++) {
80         appenders[i].start();
81       }
82       for (int i = 0; i < count; i++) {
83         //ensure that all threads are joined before closing the wal
84         appenders[i].join();
85       }
86     } finally {
87       wals.close();
88     }
89     for (int i = 0; i < count; i++) {
90       assertFalse(appenders[i].isException());
91     }
92   }
93 
94   /**
95    * Appender thread.  Appends to passed wal file.
96    */
97   static class Appender extends Thread {
98     private final Log log;
99     private final WAL wal;
100     private final int count;
101     private Exception e = null;
102 
Appender(final WAL wal, final int index, final int count)103     Appender(final WAL wal, final int index, final int count) {
104       super("" + index);
105       this.wal = wal;
106       this.count = count;
107       this.log = LogFactory.getLog("Appender:" + getName());
108     }
109 
110     /**
111      * @return Call when the thread is done.
112      */
isException()113     boolean isException() {
114       return !isAlive() && this.e != null;
115     }
116 
getException()117     Exception getException() {
118       return this.e;
119     }
120 
121     @Override
run()122     public void run() {
123       this.log.info(getName() +" started");
124       final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
125       try {
126         for (int i = 0; i < this.count; i++) {
127           long now = System.currentTimeMillis();
128           // Roll every ten edits
129           if (i % 10 == 0) {
130             this.wal.rollWriter();
131           }
132           WALEdit edit = new WALEdit();
133           byte[] bytes = Bytes.toBytes(i);
134           edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
135           final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
136           final FSTableDescriptors fts = new FSTableDescriptors(TEST_UTIL.getConfiguration());
137           final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
138           final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
139               TableName.META_TABLE_NAME, now, mvcc), edit, true);
140           wal.sync(txid);
141         }
142         String msg = getName() + " finished";
143         if (isException())
144           this.log.info(msg, getException());
145         else
146           this.log.info(msg);
147       } catch (Exception e) {
148         this.e = e;
149         log.info("Caught exception from Appender:" + getName(), e);
150       } finally {
151         // Call sync on our log.else threads just hang out.
152         try {
153           this.wal.sync();
154         } catch (IOException e) {
155           throw new RuntimeException(e);
156         }
157       }
158     }
159   }
160 
161   //@org.junit.Rule
162   //public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
163   //  new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
164 }
165