1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with this
4  * work for additional information regarding copyright ownership. The ASF
5  * licenses this file to you under the Apache License, Version 2.0 (the
6  * "License"); you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations
15  * under the License.
16  */
17 package org.apache.hadoop.hbase.util;
18 
19 import java.io.IOException;
20 import java.util.Arrays;
21 import java.util.HashSet;
22 import java.util.Set;
23 import java.util.concurrent.atomic.AtomicLong;
24 
25 import org.apache.commons.lang.math.RandomUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HRegionLocation;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.Get;
32 
33 import org.apache.hadoop.hbase.client.Consistency;
34 import org.apache.hadoop.hbase.client.HTableInterface;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38 
39 /** Creates multiple threads that read and verify previously written data */
40 public class MultiThreadedReader extends MultiThreadedAction
41 {
42   private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
43 
44   protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
45   private final double verifyPercent;
46   protected volatile boolean aborted;
47 
48   protected MultiThreadedWriterBase writer = null;
49 
50   /**
51    * The number of keys verified in a sequence. This will never be larger than
52    * the total number of keys in the range. The reader might also verify
53    * random keys when it catches up with the writer.
54    */
55   private final AtomicLong numUniqueKeysVerified = new AtomicLong();
56 
57   /**
58    * Default maximum number of read errors to tolerate before shutting down all
59    * readers.
60    */
61   public static final int DEFAULT_MAX_ERRORS = 10;
62 
63   /**
64    * Default "window" size between the last key written by the writer and the
65    * key that we attempt to read. The lower this number, the stricter our
66    * testing is. If this is zero, we always attempt to read the highest key
67    * in the contiguous sequence of keys written by the writers.
68    */
69   public static final int DEFAULT_KEY_WINDOW = 0;
70 
71   /**
72    * Default batch size for multigets
73    */
74   public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET)
75 
76   protected AtomicLong numKeysVerified = new AtomicLong(0);
77   protected AtomicLong numReadErrors = new AtomicLong(0);
78   protected AtomicLong numReadFailures = new AtomicLong(0);
79   protected AtomicLong nullResult = new AtomicLong(0);
80   private int maxErrors = DEFAULT_MAX_ERRORS;
81   private int keyWindow = DEFAULT_KEY_WINDOW;
82   private int batchSize = DEFAULT_BATCH_SIZE;
83   private int regionReplicaId = -1; // particular region replica id to do reads against if set
84 
MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent)85   public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
86       TableName tableName, double verifyPercent) throws IOException {
87     super(dataGen, conf, tableName, "R");
88     this.verifyPercent = verifyPercent;
89   }
90 
linkToWriter(MultiThreadedWriterBase writer)91   public void linkToWriter(MultiThreadedWriterBase writer) {
92     this.writer = writer;
93     writer.setTrackWroteKeys(true);
94   }
95 
setMaxErrors(int maxErrors)96   public void setMaxErrors(int maxErrors) {
97     this.maxErrors = maxErrors;
98   }
99 
setKeyWindow(int keyWindow)100   public void setKeyWindow(int keyWindow) {
101     this.keyWindow = keyWindow;
102   }
103 
setMultiGetBatchSize(int batchSize)104   public void setMultiGetBatchSize(int batchSize) {
105     this.batchSize = batchSize;
106   }
107 
setRegionReplicaId(int regionReplicaId)108   public void setRegionReplicaId(int regionReplicaId) {
109     this.regionReplicaId = regionReplicaId;
110   }
111 
112   @Override
start(long startKey, long endKey, int numThreads)113   public void start(long startKey, long endKey, int numThreads) throws IOException {
114     super.start(startKey, endKey, numThreads);
115     if (verbose) {
116       LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
117     }
118 
119     addReaderThreads(numThreads);
120     startThreads(readers);
121   }
122 
addReaderThreads(int numThreads)123   protected void addReaderThreads(int numThreads) throws IOException {
124     for (int i = 0; i < numThreads; ++i) {
125       HBaseReaderThread reader = createReaderThread(i);
126       readers.add(reader);
127     }
128   }
129 
createReaderThread(int readerId)130   protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
131     HBaseReaderThread reader = new HBaseReaderThread(readerId);
132     Threads.setLoggingUncaughtExceptionHandler(reader);
133     return reader;
134   }
135 
136   public class HBaseReaderThread extends Thread {
137     protected final int readerId;
138     protected final Table table;
139 
140     /** The "current" key being read. Increases from startKey to endKey. */
141     private long curKey;
142 
143     /** Time when the thread started */
144     protected long startTimeMs;
145 
146     /** If we are ahead of the writer and reading a random key. */
147     private boolean readingRandomKey;
148 
149     private boolean printExceptionTrace = true;
150 
151     /**
152      * @param readerId only the keys with this remainder from division by
153      *          {@link #numThreads} will be read by this thread
154      */
HBaseReaderThread(int readerId)155     public HBaseReaderThread(int readerId) throws IOException {
156       this.readerId = readerId;
157       table = createTable();
158       setName(getClass().getSimpleName() + "_" + readerId);
159     }
160 
createTable()161     protected HTableInterface createTable() throws IOException {
162       return connection.getTable(tableName);
163     }
164 
165     @Override
run()166     public void run() {
167       try {
168         runReader();
169       } finally {
170         closeTable();
171         numThreadsWorking.decrementAndGet();
172       }
173     }
174 
closeTable()175     protected void closeTable() {
176       try {
177         if (table != null) {
178           table.close();
179         }
180       } catch (IOException e) {
181         LOG.error("Error closing table", e);
182       }
183     }
184 
runReader()185     private void runReader() {
186       if (verbose) {
187         LOG.info("Started thread #" + readerId + " for reads...");
188       }
189 
190       startTimeMs = System.currentTimeMillis();
191       curKey = startKey;
192       long [] keysForThisReader = new long[batchSize];
193       while (curKey < endKey && !aborted) {
194         int readingRandomKeyStartIndex = -1;
195         int numKeys = 0;
196         // if multiGet, loop until we have the number of keys equal to the batch size
197         do {
198           long k = getNextKeyToRead();
199           if (k < startKey || k >= endKey) {
200             numReadErrors.incrementAndGet();
201             throw new AssertionError("Load tester logic error: proposed key " +
202                 "to read " + k + " is out of range (startKey=" + startKey +
203                 ", endKey=" + endKey + ")");
204           }
205           if (k % numThreads != readerId ||
206               writer != null && writer.failedToWriteKey(k)) {
207             // Skip keys that this thread should not read, as well as the keys
208             // that we know the writer failed to write.
209             continue;
210           }
211           keysForThisReader[numKeys] = k;
212           if (readingRandomKey && readingRandomKeyStartIndex == -1) {
213             //store the first index of a random read
214             readingRandomKeyStartIndex = numKeys;
215           }
216           numKeys++;
217         } while (numKeys < batchSize && curKey < endKey && !aborted);
218 
219         if (numKeys > 0) { //meaning there is some key to read
220           readKey(keysForThisReader);
221           // We have verified some unique key(s).
222           numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
223               numKeys : readingRandomKeyStartIndex);
224         }
225       }
226     }
227 
228     /**
229      * Should only be used for the concurrent writer/reader workload. The
230      * maximum key we are allowed to read, subject to the "key window"
231      * constraint.
232      */
maxKeyWeCanRead()233     private long maxKeyWeCanRead() {
234       long insertedUpToKey = writer.wroteUpToKey();
235       if (insertedUpToKey >= endKey - 1) {
236         // The writer has finished writing our range, so we can read any
237         // key in the range.
238         return endKey - 1;
239       }
240       return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
241     }
242 
getNextKeyToRead()243     protected long getNextKeyToRead() {
244       readingRandomKey = false;
245       if (writer == null || curKey <= maxKeyWeCanRead()) {
246         return curKey++;
247       }
248 
249       // We caught up with the writer. See if we can read any keys at all.
250       long maxKeyToRead;
251       while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
252         // The writer has not written sufficient keys for us to be able to read
253         // anything at all. Sleep a bit. This should only happen in the
254         // beginning of a load test run.
255         Threads.sleepWithoutInterrupt(50);
256       }
257 
258       if (curKey <= maxKeyToRead) {
259         // The writer wrote some keys, and we are now allowed to read our
260         // current key.
261         return curKey++;
262       }
263 
264       // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
265       // Don't increment the current key -- we still have to try reading it
266       // later. Set a flag to make sure that we don't count this key towards
267       // the set of unique keys we have verified.
268       readingRandomKey = true;
269       return startKey + Math.abs(RandomUtils.nextLong())
270           % (maxKeyToRead - startKey + 1);
271     }
272 
readKey(long[] keysToRead)273     private Get[] readKey(long[] keysToRead) {
274       Get [] gets = new Get[keysToRead.length];
275       int i = 0;
276       for (long keyToRead : keysToRead) {
277         try {
278           gets[i] = createGet(keyToRead);
279           if (keysToRead.length == 1) {
280             queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead);
281           }
282           i++;
283         } catch (IOException e) {
284           numReadFailures.addAndGet(1);
285           LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
286               + ", time from start: "
287               + (System.currentTimeMillis() - startTimeMs) + " ms");
288           if (printExceptionTrace) {
289             LOG.warn(e);
290             printExceptionTrace = false;
291           }
292         }
293       }
294       if (keysToRead.length > 1) {
295         try {
296           queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead);
297         } catch (IOException e) {
298           numReadFailures.addAndGet(gets.length);
299           for (long keyToRead : keysToRead) {
300             LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
301                 + ", time from start: "
302                 + (System.currentTimeMillis() - startTimeMs) + " ms");
303           }
304           if (printExceptionTrace) {
305             LOG.warn(e);
306             printExceptionTrace = false;
307           }
308         }
309       }
310       return gets;
311     }
312 
createGet(long keyToRead)313     protected Get createGet(long keyToRead) throws IOException {
314       Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
315       String cfsString = "";
316       byte[][] columnFamilies = dataGenerator.getColumnFamilies();
317       for (byte[] cf : columnFamilies) {
318         get.addFamily(cf);
319         if (verbose) {
320           if (cfsString.length() > 0) {
321             cfsString += ", ";
322           }
323           cfsString += "[" + Bytes.toStringBinary(cf) + "]";
324         }
325       }
326       get = dataGenerator.beforeGet(keyToRead, get);
327       if (regionReplicaId > 0) {
328         get.setReplicaId(regionReplicaId);
329         get.setConsistency(Consistency.TIMELINE);
330       }
331       if (verbose) {
332         LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
333       }
334       return get;
335     }
336 
queryKey(Get[] gets, boolean verify, long[] keysToRead)337     public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
338       // read the data
339       long start = System.nanoTime();
340       // Uses multi/batch gets
341       Result[] results = table.get(Arrays.asList(gets));
342       long end = System.nanoTime();
343       verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
344     }
345 
queryKey(Get get, boolean verify, long keyToRead)346     public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
347       // read the data
348 
349       long start = System.nanoTime();
350       // Uses simple get
351       Result result = table.get(get);
352       long end = System.nanoTime();
353       verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
354     }
355 
verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, Result[] results, Table table, boolean isNullExpected)356     protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
357         Result[] results, Table table, boolean isNullExpected)
358         throws IOException {
359       totalOpTimeMs.addAndGet(elapsedNano / 1000000);
360       numKeys.addAndGet(gets.length);
361       int i = 0;
362       for (Result result : results) {
363         verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
364             isNullExpected);
365       }
366     }
367 
verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano, Result result, Table table, boolean isNullExpected)368     protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
369         Result result, Table table, boolean isNullExpected)
370         throws IOException {
371       verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
372           new Result[]{result}, table, isNullExpected);
373     }
374 
verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get, Result result, Table table, boolean isNullExpected)375     private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
376         Result result, Table table, boolean isNullExpected) throws IOException {
377       if (!result.isEmpty()) {
378         if (verify) {
379           numKeysVerified.incrementAndGet();
380         }
381       } else {
382         HRegionLocation hloc = connection.getRegionLocation(tableName,
383           get.getRow(), false);
384         String rowKey = Bytes.toString(get.getRow());
385         LOG.info("Key = " + rowKey + ", Region location: " + hloc);
386         if(isNullExpected) {
387           nullResult.incrementAndGet();
388           LOG.debug("Null result obtained for the key ="+rowKey);
389           return;
390         }
391       }
392       boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
393       long numErrorsAfterThis = 0;
394       if (isOk) {
395         long cols = 0;
396         // Count the columns for reporting purposes.
397         for (byte[] cf : result.getMap().keySet()) {
398           cols += result.getFamilyMap(cf).size();
399         }
400         numCols.addAndGet(cols);
401       } else {
402         if (writer != null) {
403           LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
404         }
405         numErrorsAfterThis = numReadErrors.incrementAndGet();
406       }
407 
408       if (numErrorsAfterThis > maxErrors) {
409         LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
410         aborted = true;
411       }
412     }
413   }
414 
getNumReadFailures()415   public long getNumReadFailures() {
416     return numReadFailures.get();
417   }
418 
getNumReadErrors()419   public long getNumReadErrors() {
420     return numReadErrors.get();
421   }
422 
getNumKeysVerified()423   public long getNumKeysVerified() {
424     return numKeysVerified.get();
425   }
426 
getNumUniqueKeysVerified()427   public long getNumUniqueKeysVerified() {
428     return numUniqueKeysVerified.get();
429   }
430 
getNullResultsCount()431   public long getNullResultsCount() {
432     return nullResult.get();
433   }
434 
435   @Override
progressInfo()436   protected String progressInfo() {
437     StringBuilder sb = new StringBuilder();
438     appendToStatus(sb, "verified", numKeysVerified.get());
439     appendToStatus(sb, "READ FAILURES", numReadFailures.get());
440     appendToStatus(sb, "READ ERRORS", numReadErrors.get());
441     appendToStatus(sb, "NULL RESULT", nullResult.get());
442     return sb.toString();
443   }
444 }
445