1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.regionserver.wal; 19 20 import static org.junit.Assert.assertFalse; 21 22 import java.io.IOException; 23 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.apache.hadoop.conf.Configuration; 27 import org.apache.hadoop.fs.Path; 28 import org.apache.hadoop.hbase.HBaseTestingUtility; 29 import org.apache.hadoop.hbase.HConstants; 30 import org.apache.hadoop.hbase.HRegionInfo; 31 import org.apache.hadoop.hbase.HTableDescriptor; 32 import org.apache.hadoop.hbase.KeyValue; 33 import org.apache.hadoop.hbase.TableName; 34 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 35 import org.apache.hadoop.hbase.testclassification.SmallTests; 36 import org.apache.hadoop.hbase.util.Bytes; 37 import org.apache.hadoop.hbase.util.FSTableDescriptors; 38 import org.apache.hadoop.hbase.util.FSUtils; 39 import org.apache.hadoop.hbase.wal.WAL; 40 import org.apache.hadoop.hbase.wal.WALFactory; 41 import org.apache.hadoop.hbase.wal.WALKey; 42 import org.junit.Test; 43 import org.junit.experimental.categories.Category; 44 45 /** 46 * Test many concurrent appenders to an WAL while rolling the log. 47 */ 48 @Category(SmallTests.class) 49 public class TestLogRollingNoCluster { 50 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 51 private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; 52 private static final int THREAD_COUNT = 100; // Spin up this many threads 53 54 /** 55 * Spin up a bunch of threads and have them all append to a WAL. Roll the 56 * WAL frequently to try and trigger NPE. 57 * @throws IOException 58 * @throws InterruptedException 59 */ 60 @Test testContendedLogRolling()61 public void testContendedLogRolling() throws IOException, InterruptedException { 62 Path dir = TEST_UTIL.getDataTestDir(); 63 // The implementation needs to know the 'handler' count. 64 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT); 65 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 66 FSUtils.setRootDir(conf, dir); 67 final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName()); 68 final WAL wal = wals.getWAL(new byte[]{}); 69 70 Appender [] appenders = null; 71 72 final int count = THREAD_COUNT; 73 appenders = new Appender[count]; 74 try { 75 for (int i = 0; i < count; i++) { 76 // Have each appending thread write 'count' entries 77 appenders[i] = new Appender(wal, i, count); 78 } 79 for (int i = 0; i < count; i++) { 80 appenders[i].start(); 81 } 82 for (int i = 0; i < count; i++) { 83 //ensure that all threads are joined before closing the wal 84 appenders[i].join(); 85 } 86 } finally { 87 wals.close(); 88 } 89 for (int i = 0; i < count; i++) { 90 assertFalse(appenders[i].isException()); 91 } 92 } 93 94 /** 95 * Appender thread. Appends to passed wal file. 96 */ 97 static class Appender extends Thread { 98 private final Log log; 99 private final WAL wal; 100 private final int count; 101 private Exception e = null; 102 Appender(final WAL wal, final int index, final int count)103 Appender(final WAL wal, final int index, final int count) { 104 super("" + index); 105 this.wal = wal; 106 this.count = count; 107 this.log = LogFactory.getLog("Appender:" + getName()); 108 } 109 110 /** 111 * @return Call when the thread is done. 112 */ isException()113 boolean isException() { 114 return !isAlive() && this.e != null; 115 } 116 getException()117 Exception getException() { 118 return this.e; 119 } 120 121 @Override run()122 public void run() { 123 this.log.info(getName() +" started"); 124 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 125 try { 126 for (int i = 0; i < this.count; i++) { 127 long now = System.currentTimeMillis(); 128 // Roll every ten edits 129 if (i % 10 == 0) { 130 this.wal.rollWriter(); 131 } 132 WALEdit edit = new WALEdit(); 133 byte[] bytes = Bytes.toBytes(i); 134 edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); 135 final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO; 136 final FSTableDescriptors fts = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 137 final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME); 138 final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), 139 TableName.META_TABLE_NAME, now, mvcc), edit, true); 140 wal.sync(txid); 141 } 142 String msg = getName() + " finished"; 143 if (isException()) 144 this.log.info(msg, getException()); 145 else 146 this.log.info(msg); 147 } catch (Exception e) { 148 this.e = e; 149 log.info("Caught exception from Appender:" + getName(), e); 150 } finally { 151 // Call sync on our log.else threads just hang out. 152 try { 153 this.wal.sync(); 154 } catch (IOException e) { 155 throw new RuntimeException(e); 156 } 157 } 158 } 159 } 160 161 //@org.junit.Rule 162 //public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = 163 // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); 164 } 165