1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with this 4 * work for additional information regarding copyright ownership. The ASF 5 * licenses this file to you under the Apache License, Version 2.0 (the 6 * "License"); you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 * License for the specific language governing permissions and limitations 15 * under the License. 16 */ 17 package org.apache.hadoop.hbase.util; 18 19 import java.io.IOException; 20 import java.util.Arrays; 21 import java.util.HashSet; 22 import java.util.Set; 23 import java.util.concurrent.atomic.AtomicLong; 24 25 import org.apache.commons.lang.math.RandomUtils; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.apache.hadoop.conf.Configuration; 29 import org.apache.hadoop.hbase.HRegionLocation; 30 import org.apache.hadoop.hbase.TableName; 31 import org.apache.hadoop.hbase.client.Get; 32 33 import org.apache.hadoop.hbase.client.Consistency; 34 import org.apache.hadoop.hbase.client.HTableInterface; 35 import org.apache.hadoop.hbase.client.Result; 36 import org.apache.hadoop.hbase.client.Table; 37 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 38 39 /** Creates multiple threads that read and verify previously written data */ 40 public class MultiThreadedReader extends MultiThreadedAction 41 { 42 private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class); 43 44 protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>(); 45 private final double verifyPercent; 46 protected volatile boolean aborted; 47 48 protected MultiThreadedWriterBase writer = null; 49 50 /** 51 * The number of keys verified in a sequence. This will never be larger than 52 * the total number of keys in the range. The reader might also verify 53 * random keys when it catches up with the writer. 54 */ 55 private final AtomicLong numUniqueKeysVerified = new AtomicLong(); 56 57 /** 58 * Default maximum number of read errors to tolerate before shutting down all 59 * readers. 60 */ 61 public static final int DEFAULT_MAX_ERRORS = 10; 62 63 /** 64 * Default "window" size between the last key written by the writer and the 65 * key that we attempt to read. The lower this number, the stricter our 66 * testing is. If this is zero, we always attempt to read the highest key 67 * in the contiguous sequence of keys written by the writers. 68 */ 69 public static final int DEFAULT_KEY_WINDOW = 0; 70 71 /** 72 * Default batch size for multigets 73 */ 74 public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET) 75 76 protected AtomicLong numKeysVerified = new AtomicLong(0); 77 protected AtomicLong numReadErrors = new AtomicLong(0); 78 protected AtomicLong numReadFailures = new AtomicLong(0); 79 protected AtomicLong nullResult = new AtomicLong(0); 80 private int maxErrors = DEFAULT_MAX_ERRORS; 81 private int keyWindow = DEFAULT_KEY_WINDOW; 82 private int batchSize = DEFAULT_BATCH_SIZE; 83 private int regionReplicaId = -1; // particular region replica id to do reads against if set 84 MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent)85 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, 86 TableName tableName, double verifyPercent) throws IOException { 87 super(dataGen, conf, tableName, "R"); 88 this.verifyPercent = verifyPercent; 89 } 90 linkToWriter(MultiThreadedWriterBase writer)91 public void linkToWriter(MultiThreadedWriterBase writer) { 92 this.writer = writer; 93 writer.setTrackWroteKeys(true); 94 } 95 setMaxErrors(int maxErrors)96 public void setMaxErrors(int maxErrors) { 97 this.maxErrors = maxErrors; 98 } 99 setKeyWindow(int keyWindow)100 public void setKeyWindow(int keyWindow) { 101 this.keyWindow = keyWindow; 102 } 103 setMultiGetBatchSize(int batchSize)104 public void setMultiGetBatchSize(int batchSize) { 105 this.batchSize = batchSize; 106 } 107 setRegionReplicaId(int regionReplicaId)108 public void setRegionReplicaId(int regionReplicaId) { 109 this.regionReplicaId = regionReplicaId; 110 } 111 112 @Override start(long startKey, long endKey, int numThreads)113 public void start(long startKey, long endKey, int numThreads) throws IOException { 114 super.start(startKey, endKey, numThreads); 115 if (verbose) { 116 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); 117 } 118 119 addReaderThreads(numThreads); 120 startThreads(readers); 121 } 122 addReaderThreads(int numThreads)123 protected void addReaderThreads(int numThreads) throws IOException { 124 for (int i = 0; i < numThreads; ++i) { 125 HBaseReaderThread reader = createReaderThread(i); 126 readers.add(reader); 127 } 128 } 129 createReaderThread(int readerId)130 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 131 HBaseReaderThread reader = new HBaseReaderThread(readerId); 132 Threads.setLoggingUncaughtExceptionHandler(reader); 133 return reader; 134 } 135 136 public class HBaseReaderThread extends Thread { 137 protected final int readerId; 138 protected final Table table; 139 140 /** The "current" key being read. Increases from startKey to endKey. */ 141 private long curKey; 142 143 /** Time when the thread started */ 144 protected long startTimeMs; 145 146 /** If we are ahead of the writer and reading a random key. */ 147 private boolean readingRandomKey; 148 149 private boolean printExceptionTrace = true; 150 151 /** 152 * @param readerId only the keys with this remainder from division by 153 * {@link #numThreads} will be read by this thread 154 */ HBaseReaderThread(int readerId)155 public HBaseReaderThread(int readerId) throws IOException { 156 this.readerId = readerId; 157 table = createTable(); 158 setName(getClass().getSimpleName() + "_" + readerId); 159 } 160 createTable()161 protected HTableInterface createTable() throws IOException { 162 return connection.getTable(tableName); 163 } 164 165 @Override run()166 public void run() { 167 try { 168 runReader(); 169 } finally { 170 closeTable(); 171 numThreadsWorking.decrementAndGet(); 172 } 173 } 174 closeTable()175 protected void closeTable() { 176 try { 177 if (table != null) { 178 table.close(); 179 } 180 } catch (IOException e) { 181 LOG.error("Error closing table", e); 182 } 183 } 184 runReader()185 private void runReader() { 186 if (verbose) { 187 LOG.info("Started thread #" + readerId + " for reads..."); 188 } 189 190 startTimeMs = System.currentTimeMillis(); 191 curKey = startKey; 192 long [] keysForThisReader = new long[batchSize]; 193 while (curKey < endKey && !aborted) { 194 int readingRandomKeyStartIndex = -1; 195 int numKeys = 0; 196 // if multiGet, loop until we have the number of keys equal to the batch size 197 do { 198 long k = getNextKeyToRead(); 199 if (k < startKey || k >= endKey) { 200 numReadErrors.incrementAndGet(); 201 throw new AssertionError("Load tester logic error: proposed key " + 202 "to read " + k + " is out of range (startKey=" + startKey + 203 ", endKey=" + endKey + ")"); 204 } 205 if (k % numThreads != readerId || 206 writer != null && writer.failedToWriteKey(k)) { 207 // Skip keys that this thread should not read, as well as the keys 208 // that we know the writer failed to write. 209 continue; 210 } 211 keysForThisReader[numKeys] = k; 212 if (readingRandomKey && readingRandomKeyStartIndex == -1) { 213 //store the first index of a random read 214 readingRandomKeyStartIndex = numKeys; 215 } 216 numKeys++; 217 } while (numKeys < batchSize && curKey < endKey && !aborted); 218 219 if (numKeys > 0) { //meaning there is some key to read 220 readKey(keysForThisReader); 221 // We have verified some unique key(s). 222 numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ? 223 numKeys : readingRandomKeyStartIndex); 224 } 225 } 226 } 227 228 /** 229 * Should only be used for the concurrent writer/reader workload. The 230 * maximum key we are allowed to read, subject to the "key window" 231 * constraint. 232 */ maxKeyWeCanRead()233 private long maxKeyWeCanRead() { 234 long insertedUpToKey = writer.wroteUpToKey(); 235 if (insertedUpToKey >= endKey - 1) { 236 // The writer has finished writing our range, so we can read any 237 // key in the range. 238 return endKey - 1; 239 } 240 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); 241 } 242 getNextKeyToRead()243 protected long getNextKeyToRead() { 244 readingRandomKey = false; 245 if (writer == null || curKey <= maxKeyWeCanRead()) { 246 return curKey++; 247 } 248 249 // We caught up with the writer. See if we can read any keys at all. 250 long maxKeyToRead; 251 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { 252 // The writer has not written sufficient keys for us to be able to read 253 // anything at all. Sleep a bit. This should only happen in the 254 // beginning of a load test run. 255 Threads.sleepWithoutInterrupt(50); 256 } 257 258 if (curKey <= maxKeyToRead) { 259 // The writer wrote some keys, and we are now allowed to read our 260 // current key. 261 return curKey++; 262 } 263 264 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. 265 // Don't increment the current key -- we still have to try reading it 266 // later. Set a flag to make sure that we don't count this key towards 267 // the set of unique keys we have verified. 268 readingRandomKey = true; 269 return startKey + Math.abs(RandomUtils.nextLong()) 270 % (maxKeyToRead - startKey + 1); 271 } 272 readKey(long[] keysToRead)273 private Get[] readKey(long[] keysToRead) { 274 Get [] gets = new Get[keysToRead.length]; 275 int i = 0; 276 for (long keyToRead : keysToRead) { 277 try { 278 gets[i] = createGet(keyToRead); 279 if (keysToRead.length == 1) { 280 queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead); 281 } 282 i++; 283 } catch (IOException e) { 284 numReadFailures.addAndGet(1); 285 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 286 + ", time from start: " 287 + (System.currentTimeMillis() - startTimeMs) + " ms"); 288 if (printExceptionTrace) { 289 LOG.warn(e); 290 printExceptionTrace = false; 291 } 292 } 293 } 294 if (keysToRead.length > 1) { 295 try { 296 queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead); 297 } catch (IOException e) { 298 numReadFailures.addAndGet(gets.length); 299 for (long keyToRead : keysToRead) { 300 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") 301 + ", time from start: " 302 + (System.currentTimeMillis() - startTimeMs) + " ms"); 303 } 304 if (printExceptionTrace) { 305 LOG.warn(e); 306 printExceptionTrace = false; 307 } 308 } 309 } 310 return gets; 311 } 312 createGet(long keyToRead)313 protected Get createGet(long keyToRead) throws IOException { 314 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); 315 String cfsString = ""; 316 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 317 for (byte[] cf : columnFamilies) { 318 get.addFamily(cf); 319 if (verbose) { 320 if (cfsString.length() > 0) { 321 cfsString += ", "; 322 } 323 cfsString += "[" + Bytes.toStringBinary(cf) + "]"; 324 } 325 } 326 get = dataGenerator.beforeGet(keyToRead, get); 327 if (regionReplicaId > 0) { 328 get.setReplicaId(regionReplicaId); 329 get.setConsistency(Consistency.TIMELINE); 330 } 331 if (verbose) { 332 LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); 333 } 334 return get; 335 } 336 queryKey(Get[] gets, boolean verify, long[] keysToRead)337 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { 338 // read the data 339 long start = System.nanoTime(); 340 // Uses multi/batch gets 341 Result[] results = table.get(Arrays.asList(gets)); 342 long end = System.nanoTime(); 343 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); 344 } 345 queryKey(Get get, boolean verify, long keyToRead)346 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { 347 // read the data 348 349 long start = System.nanoTime(); 350 // Uses simple get 351 Result result = table.get(get); 352 long end = System.nanoTime(); 353 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false); 354 } 355 verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, Result[] results, Table table, boolean isNullExpected)356 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 357 Result[] results, Table table, boolean isNullExpected) 358 throws IOException { 359 totalOpTimeMs.addAndGet(elapsedNano / 1000000); 360 numKeys.addAndGet(gets.length); 361 int i = 0; 362 for (Result result : results) { 363 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table, 364 isNullExpected); 365 } 366 } 367 verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, Result result, Table table, boolean isNullExpected)368 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, 369 Result result, Table table, boolean isNullExpected) 370 throws IOException { 371 verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano, 372 new Result[]{result}, table, isNullExpected); 373 } 374 verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result, Table table, boolean isNullExpected)375 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, 376 Result result, Table table, boolean isNullExpected) throws IOException { 377 if (!result.isEmpty()) { 378 if (verify) { 379 numKeysVerified.incrementAndGet(); 380 } 381 } else { 382 HRegionLocation hloc = connection.getRegionLocation(tableName, 383 get.getRow(), false); 384 String rowKey = Bytes.toString(get.getRow()); 385 LOG.info("Key = " + rowKey + ", Region location: " + hloc); 386 if(isNullExpected) { 387 nullResult.incrementAndGet(); 388 LOG.debug("Null result obtained for the key ="+rowKey); 389 return; 390 } 391 } 392 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); 393 long numErrorsAfterThis = 0; 394 if (isOk) { 395 long cols = 0; 396 // Count the columns for reporting purposes. 397 for (byte[] cf : result.getMap().keySet()) { 398 cols += result.getFamilyMap(cf).size(); 399 } 400 numCols.addAndGet(cols); 401 } else { 402 if (writer != null) { 403 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys"); 404 } 405 numErrorsAfterThis = numReadErrors.incrementAndGet(); 406 } 407 408 if (numErrorsAfterThis > maxErrors) { 409 LOG.error("Aborting readers -- found more than " + maxErrors + " errors"); 410 aborted = true; 411 } 412 } 413 } 414 getNumReadFailures()415 public long getNumReadFailures() { 416 return numReadFailures.get(); 417 } 418 getNumReadErrors()419 public long getNumReadErrors() { 420 return numReadErrors.get(); 421 } 422 getNumKeysVerified()423 public long getNumKeysVerified() { 424 return numKeysVerified.get(); 425 } 426 getNumUniqueKeysVerified()427 public long getNumUniqueKeysVerified() { 428 return numUniqueKeysVerified.get(); 429 } 430 getNullResultsCount()431 public long getNullResultsCount() { 432 return nullResult.get(); 433 } 434 435 @Override progressInfo()436 protected String progressInfo() { 437 StringBuilder sb = new StringBuilder(); 438 appendToStatus(sb, "verified", numKeysVerified.get()); 439 appendToStatus(sb, "READ FAILURES", numReadFailures.get()); 440 appendToStatus(sb, "READ ERRORS", numReadErrors.get()); 441 appendToStatus(sb, "NULL RESULT", nullResult.get()); 442 return sb.toString(); 443 } 444 } 445