1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  * <p/>
10  * http://www.apache.org/licenses/LICENSE-2.0
11  * <p/>
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hbase.test;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 
23 import com.google.common.collect.Sets;
24 
25 import org.apache.commons.cli.CommandLine;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileStatus;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.LocatedFileStatus;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.fs.RemoteIterator;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.IntegrationTestBase;
40 import org.apache.hadoop.hbase.IntegrationTestingUtility;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
45 import org.apache.hadoop.hbase.TableName;
46 import org.apache.hadoop.hbase.client.Admin;
47 import org.apache.hadoop.hbase.client.BufferedMutator;
48 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
49 import org.apache.hadoop.hbase.client.Mutation;
50 import org.apache.hadoop.hbase.client.Put;
51 import org.apache.hadoop.hbase.client.Result;
52 import org.apache.hadoop.hbase.client.Scan;
53 import org.apache.hadoop.hbase.client.ScannerCallable;
54 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
55 import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
56 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
57 import org.apache.hadoop.hbase.mapreduce.TableMapper;
58 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
59 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
60 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.wal.WALKey;
63 import org.apache.hadoop.io.BytesWritable;
64 import org.apache.hadoop.io.NullWritable;
65 import org.apache.hadoop.io.Text;
66 import org.apache.hadoop.mapreduce.Counter;
67 import org.apache.hadoop.mapreduce.Job;
68 import org.apache.hadoop.mapreduce.Mapper;
69 import org.apache.hadoop.mapreduce.Reducer;
70 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
71 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
72 import org.apache.hadoop.util.ToolRunner;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75 
76 import java.io.BufferedReader;
77 import java.io.FileNotFoundException;
78 import java.io.IOException;
79 import java.io.InputStream;
80 import java.io.InputStreamReader;
81 import java.io.InterruptedIOException;
82 import java.util.Random;
83 import java.util.Set;
84 import java.util.SortedSet;
85 import java.util.TreeSet;
86 import java.util.concurrent.atomic.AtomicInteger;
87 import java.util.regex.Matcher;
88 import java.util.regex.Pattern;
89 
90 /**
91  * A large test which loads a lot of data that has internal references, and
92  * verifies the data.
93  *
94  * In load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write
95  * (default 100K) rows to an hbase table. Rows are written in blocks, for a total of
96  * 100 blocks. Each row in a block, contains loadmapper.backrefs (default 50) references
97  * to random rows in the prev block.
98  *
99  * Verify step is scans the table, and verifies that for every referenced row, the row is
100  * actually there (no data loss). Failed rows are output from reduce to be saved in the
101  * job output dir in hdfs and inspected later.
102  *
103  * This class can be run as a unit test, as an integration test, or from the command line
104  *
105  * Originally taken from Apache Bigtop.
106  */
107 @Category(IntegrationTests.class)
108 public class IntegrationTestLoadAndVerify  extends IntegrationTestBase  {
109 
110   private static final Log LOG = LogFactory.getLog(IntegrationTestLoadAndVerify.class);
111 
112   private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
113   private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
114   private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
115 
116   private static final String NUM_TO_WRITE_KEY =
117     "loadmapper.num_to_write";
118   private static final long NUM_TO_WRITE_DEFAULT = 100*1000;
119 
120   private static final String TABLE_NAME_KEY = "loadmapper.table";
121   private static final String TABLE_NAME_DEFAULT = "table";
122 
123   private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
124   private static final int NUM_BACKREFS_DEFAULT = 50;
125 
126   private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks";
127   private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks";
128   private static final int NUM_MAP_TASKS_DEFAULT = 200;
129   private static final int NUM_REDUCE_TASKS_DEFAULT = 35;
130 
131   private static final int SCANNER_CACHING = 500;
132 
133   private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
134 
135   private String toRun = null;
136   private String keysDir = null;
137 
138   private enum Counters {
139     ROWS_WRITTEN,
140     REFERENCES_WRITTEN,
141     REFERENCES_CHECKED
142   }
143 
144   @Override
setUpCluster()145   public void setUpCluster() throws Exception {
146     util = getTestingUtil(getConf());
147     util.initializeCluster(3);
148     this.setConf(util.getConfiguration());
149     if (!util.isDistributedCluster()) {
150       getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100);
151       getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100);
152       getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10);
153       util.startMiniMapReduceCluster();
154     }
155   }
156 
157 @Override
cleanUpCluster()158 public void cleanUpCluster() throws Exception {
159   super.cleanUpCluster();
160   if (!util.isDistributedCluster()) {
161     util.shutdownMiniMapReduceCluster();
162   }
163 }
164 
165   /**
166    * Converts a "long" value between endian systems.
167    * Borrowed from Apache Commons IO
168    * @param value value to convert
169    * @return the converted value
170    */
swapLong(long value)171   public static long swapLong(long value)
172   {
173     return
174       ( ( ( value >> 0 ) & 0xff ) << 56 ) +
175       ( ( ( value >> 8 ) & 0xff ) << 48 ) +
176       ( ( ( value >> 16 ) & 0xff ) << 40 ) +
177       ( ( ( value >> 24 ) & 0xff ) << 32 ) +
178       ( ( ( value >> 32 ) & 0xff ) << 24 ) +
179       ( ( ( value >> 40 ) & 0xff ) << 16 ) +
180       ( ( ( value >> 48 ) & 0xff ) << 8 ) +
181       ( ( ( value >> 56 ) & 0xff ) << 0 );
182   }
183 
184   public static class LoadMapper
185       extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
186   {
187     protected long recordsToWrite;
188     protected Connection connection;
189     protected BufferedMutator mutator;
190     protected Configuration conf;
191     protected int numBackReferencesPerRow;
192     protected String shortTaskId;
193 
194     protected Random rand = new Random();
195 
196     protected Counter rowsWritten, refsWritten;
197 
198     @Override
setup(Context context)199     public void setup(Context context) throws IOException {
200       conf = context.getConfiguration();
201       recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
202       String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
203       numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
204       this.connection = ConnectionFactory.createConnection(conf);
205       mutator = connection.getBufferedMutator(
206           new BufferedMutatorParams(TableName.valueOf(tableName))
207               .writeBufferSize(4 * 1024 * 1024));
208 
209       String taskId = conf.get("mapreduce.task.attempt.id");
210       Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
211       if (!matcher.matches()) {
212         throw new RuntimeException("Strange task ID: " + taskId);
213       }
214       shortTaskId = matcher.group(1);
215 
216       rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
217       refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
218     }
219 
220     @Override
cleanup(Context context)221     public void cleanup(Context context) throws IOException {
222       mutator.close();
223       connection.close();
224     }
225 
226     @Override
map(NullWritable key, NullWritable value, Context context)227     protected void map(NullWritable key, NullWritable value,
228         Context context) throws IOException, InterruptedException {
229 
230       String suffix = "/" + shortTaskId;
231       byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
232 
233       int BLOCK_SIZE = (int)(recordsToWrite / 100);
234 
235       for (long i = 0; i < recordsToWrite;) {
236         long blockStart = i;
237         for (long idxInBlock = 0;
238              idxInBlock < BLOCK_SIZE && i < recordsToWrite;
239              idxInBlock++, i++) {
240 
241           long byteSwapped = swapLong(i);
242           Bytes.putLong(row, 0, byteSwapped);
243 
244           Put p = new Put(row);
245           p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
246           if (blockStart > 0) {
247             for (int j = 0; j < numBackReferencesPerRow; j++) {
248               long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
249               Bytes.putLong(row, 0, swapLong(referredRow));
250               p.add(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
251             }
252             refsWritten.increment(1);
253           }
254           rowsWritten.increment(1);
255           mutator.mutate(p);
256 
257           if (i % 100 == 0) {
258             context.setStatus("Written " + i + "/" + recordsToWrite + " records");
259             context.progress();
260           }
261         }
262         // End of block, flush all of them before we start writing anything
263         // pointing to these!
264         mutator.flush();
265       }
266     }
267   }
268 
269   public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
270     static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
271 
272 
273     @Override
map(ImmutableBytesWritable key, Result value, Context context)274     protected void map(ImmutableBytesWritable key, Result value, Context context)
275         throws IOException, InterruptedException {
276       BytesWritable bwKey = new BytesWritable(key.get());
277       BytesWritable bwVal = new BytesWritable();
278       for (Cell kv : value.listCells()) {
279         if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length,
280                             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) {
281           context.write(bwKey, EMPTY);
282         } else {
283           bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
284           context.write(bwVal, bwKey);
285         }
286       }
287     }
288   }
289 
290   public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
291     private Counter refsChecked;
292     private Counter rowsWritten;
293 
294     @Override
setup(Context context)295     public void setup(Context context) throws IOException {
296       refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
297       rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
298     }
299 
300     @Override
reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers, VerifyReducer.Context ctx)301     protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
302         VerifyReducer.Context ctx) throws IOException, InterruptedException {
303       boolean gotOriginalRow = false;
304       int refCount = 0;
305 
306       for (BytesWritable ref : referrers) {
307         if (ref.getLength() == 0) {
308           assert !gotOriginalRow;
309           gotOriginalRow = true;
310         } else {
311           refCount++;
312         }
313       }
314       refsChecked.increment(refCount);
315 
316       if (!gotOriginalRow) {
317         String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
318         String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
319         LOG.error("Reference error row " + parsedRow);
320         ctx.write(new Text(binRow), new Text(parsedRow));
321         rowsWritten.increment(1);
322       }
323     }
324 
makeRowReadable(byte[] bytes, int length)325     private String makeRowReadable(byte[] bytes, int length) {
326       long rowIdx = swapLong(Bytes.toLong(bytes, 0));
327       String suffix = Bytes.toString(bytes, 8, length - 8);
328 
329       return "Row #" + rowIdx + " suffix " + suffix;
330     }
331   }
332 
doLoad(Configuration conf, HTableDescriptor htd)333   protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
334     Path outputDir = getTestDir(TEST_NAME, "load-output");
335     LOG.info("Load output dir: " + outputDir);
336 
337     NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
338     conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
339 
340     Job job = Job.getInstance(conf);
341     job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
342     job.setJarByClass(this.getClass());
343     setMapperClass(job);
344     job.setInputFormatClass(NMapInputFormat.class);
345     job.setNumReduceTasks(0);
346     setJobScannerConf(job);
347     FileOutputFormat.setOutputPath(job, outputDir);
348 
349     TableMapReduceUtil.addDependencyJars(job);
350 
351     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
352     TableMapReduceUtil.initCredentials(job);
353     assertTrue(job.waitForCompletion(true));
354     return job;
355   }
356 
setMapperClass(Job job)357   protected void setMapperClass(Job job) {
358     job.setMapperClass(LoadMapper.class);
359   }
360 
doVerify(Configuration conf, HTableDescriptor htd)361   protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
362     Path outputDir = getTestDir(TEST_NAME, "verify-output");
363     LOG.info("Verify output dir: " + outputDir);
364 
365     Job job = Job.getInstance(conf);
366     job.setJarByClass(this.getClass());
367     job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
368     setJobScannerConf(job);
369 
370     Scan scan = new Scan();
371 
372     TableMapReduceUtil.initTableMapperJob(
373         htd.getTableName().getNameAsString(), scan, VerifyMapper.class,
374         BytesWritable.class, BytesWritable.class, job);
375     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
376     int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
377     TableMapReduceUtil.setScannerCaching(job, scannerCaching);
378 
379     job.setReducerClass(VerifyReducer.class);
380     job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
381     FileOutputFormat.setOutputPath(job, outputDir);
382     assertTrue(job.waitForCompletion(true));
383 
384     long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
385     assertEquals(0, numOutputRecords);
386   }
387 
388   /**
389    * Tool to search missing rows in WALs and hfiles.
390    * Pass in file or dir of keys to search for. Key file must have been written by Verify step
391    * (we depend on the format it writes out. We'll read them in and then search in hbase
392    * WALs and oldWALs dirs (Some of this is TODO).
393    */
394   public static class WALSearcher extends WALPlayer {
WALSearcher(Configuration conf)395     public WALSearcher(Configuration conf) {
396       super(conf);
397     }
398 
399     /**
400      * The actual searcher mapper.
401      */
402     public static class WALMapperSearcher extends WALMapper {
403       private SortedSet<byte []> keysToFind;
404       private AtomicInteger rows = new AtomicInteger(0);
405 
406       @Override
setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)407       public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
408           throws IOException {
409         super.setup(context);
410         try {
411           this.keysToFind = readKeysToSearch(context.getConfiguration());
412           LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
413         } catch (InterruptedException e) {
414           throw new InterruptedIOException(e.toString());
415         }
416       }
417 
418       @Override
filter(Context context, Cell cell)419       protected boolean filter(Context context, Cell cell) {
420         // TODO: Can I do a better compare than this copying out key?
421         byte [] row = new byte [cell.getRowLength()];
422         System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
423         boolean b = this.keysToFind.contains(row);
424         if (b) {
425           String keyStr = Bytes.toStringBinary(row);
426           try {
427             LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
428           } catch (IOException|InterruptedException e) {
429             LOG.warn(e);
430           }
431           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
432             context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
433           }
434           context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
435         }
436         return b;
437       }
438     }
439 
440     // Put in place the above WALMapperSearcher.
441     @Override
createSubmittableJob(String[] args)442     public Job createSubmittableJob(String[] args) throws IOException {
443       Job job = super.createSubmittableJob(args);
444       // Call my class instead.
445       job.setJarByClass(WALMapperSearcher.class);
446       job.setMapperClass(WALMapperSearcher.class);
447       job.setOutputFormatClass(NullOutputFormat.class);
448       return job;
449     }
450   }
451 
452   static final String FOUND_GROUP_KEY = "Found";
453   static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
454 
readKeysToSearch(final Configuration conf)455   static SortedSet<byte []> readKeysToSearch(final Configuration conf)
456       throws IOException, InterruptedException {
457     Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
458     FileSystem fs = FileSystem.get(conf);
459     SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
460     if (!fs.exists(keysInputDir)) {
461       throw new FileNotFoundException(keysInputDir.toString());
462     }
463     if (!fs.isDirectory(keysInputDir)) {
464       FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
465       readFileToSearch(conf, fs, keyFileStatus, result);
466     } else {
467       RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
468       while(iterator.hasNext()) {
469         LocatedFileStatus keyFileStatus = iterator.next();
470         // Skip "_SUCCESS" file.
471         if (keyFileStatus.getPath().getName().startsWith("_")) continue;
472         readFileToSearch(conf, fs, keyFileStatus, result);
473       }
474     }
475     return result;
476   }
477 
readFileToSearch(final Configuration conf, final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)478   private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
479     final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)
480         throws IOException,
481     InterruptedException {
482     // verify uses file output format and writes <Text, Text>. We can read it as a text file
483     try (InputStream in = fs.open(keyFileStatus.getPath());
484         BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
485       // extract out the key and return that missing as a missing key
486       String line;
487       while ((line = reader.readLine()) != null) {
488         if (line.isEmpty()) continue;
489 
490         String[] parts = line.split("\\s+");
491         if (parts.length >= 1) {
492           String key = parts[0];
493           result.add(Bytes.toBytesBinary(key));
494         } else {
495           LOG.info("Cannot parse key from: " + line);
496         }
497       }
498     }
499     return result;
500   }
501 
doSearch(Configuration conf, String keysDir)502   private int doSearch(Configuration conf, String keysDir) throws Exception {
503     Path inputDir = new Path(keysDir);
504 
505     getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
506     SortedSet<byte []> keys = readKeysToSearch(getConf());
507     if (keys.isEmpty()) throw new RuntimeException("No keys to find");
508     LOG.info("Count of keys to find: " + keys.size());
509     for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
510     Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
511     // Now read all WALs. In two dirs. Presumes certain layout.
512     Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
513     Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
514     LOG.info("Running Search with keys inputDir=" + inputDir +
515       " against " + getConf().get(HConstants.HBASE_DIR));
516     int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
517     if (ret != 0) return ret;
518     return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
519   }
520 
setJobScannerConf(Job job)521   private static void setJobScannerConf(Job job) {
522     // Make sure scanners log something useful to make debugging possible.
523     job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
524     long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100;
525     job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int)lpr);
526   }
527 
getTestDir(String testName, String subdir)528   public Path getTestDir(String testName, String subdir) throws IOException {
529     Path testDir = util.getDataTestDirOnTestFS(testName);
530     FileSystem fs = FileSystem.get(getConf());
531     fs.deleteOnExit(testDir);
532 
533     return new Path(new Path(testDir, testName), subdir);
534   }
535 
536   @Test
testLoadAndVerify()537   public void testLoadAndVerify() throws Exception {
538     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME));
539     htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
540 
541     Admin admin = getTestingUtil(getConf()).getHBaseAdmin();
542     admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40);
543 
544     doLoad(getConf(), htd);
545     doVerify(getConf(), htd);
546 
547     // Only disable and drop if we succeeded to verify - otherwise it's useful
548     // to leave it around for post-mortem
549     getTestingUtil(getConf()).deleteTable(htd.getTableName());
550   }
551 
usage()552   public void usage() {
553     System.err.println(this.getClass().getSimpleName()
554       + " [-Doptions] <load|verify|loadAndVerify|search>");
555     System.err.println("  Loads a table with row dependencies and verifies the dependency chains");
556     System.err.println("Options");
557     System.err.println("  -Dloadmapper.table=<name>        Table to write/verify (default autogen)");
558     System.err.println("  -Dloadmapper.backrefs=<n>        Number of backreferences per row (default 50)");
559     System.err.println("  -Dloadmapper.num_to_write=<n>    Number of rows per mapper (default 100,000 per mapper)");
560     System.err.println("  -Dloadmapper.deleteAfter=<bool>  Delete after a successful verify (default true)");
561     System.err.println("  -Dloadmapper.numPresplits=<n>    Number of presplit regions to start with (default 40)");
562     System.err.println("  -Dloadmapper.map.tasks=<n>       Number of map tasks for load (default 200)");
563     System.err.println("  -Dverify.reduce.tasks=<n>        Number of reduce tasks for verify (default 35)");
564     System.err.println("  -Dverify.scannercaching=<n>      Number hbase scanner caching rows to read (default 50)");
565   }
566 
567 
568   @Override
processOptions(CommandLine cmd)569   protected void processOptions(CommandLine cmd) {
570     super.processOptions(cmd);
571 
572     String[] args = cmd.getArgs();
573     if (args == null || args.length < 1) {
574       usage();
575       throw new RuntimeException("Incorrect Number of args.");
576     }
577     toRun = args[0];
578     if (toRun.equalsIgnoreCase("search")) {
579       if (args.length > 1) {
580         keysDir = args[1];
581       }
582     }
583   }
584 
585   @Override
runTestFromCommandLine()586   public int runTestFromCommandLine() throws Exception {
587     IntegrationTestingUtility.setUseDistributedCluster(getConf());
588     boolean doLoad = false;
589     boolean doVerify = false;
590     boolean doSearch = false;
591     boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
592     int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
593 
594     if (toRun.equalsIgnoreCase("load")) {
595       doLoad = true;
596     } else if (toRun.equalsIgnoreCase("verify")) {
597       doVerify= true;
598     } else if (toRun.equalsIgnoreCase("loadAndVerify")) {
599       doLoad=true;
600       doVerify= true;
601     } else if (toRun.equalsIgnoreCase("search")) {
602       doLoad=false;
603       doVerify= false;
604       doSearch = true;
605       if (keysDir == null) {
606         System.err.println("Usage: search <KEYS_DIR>]");
607         return 1;
608       }
609     } else {
610       System.err.println("Invalid argument " + toRun);
611       usage();
612       return 1;
613     }
614 
615     // create HTableDescriptor for specified table
616     TableName table = getTablename();
617     HTableDescriptor htd = new HTableDescriptor(table);
618     htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
619 
620     if (doLoad) {
621       try (Connection conn = ConnectionFactory.createConnection(getConf());
622           Admin admin = conn.getAdmin()) {
623         admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
624         doLoad(getConf(), htd);
625       }
626     }
627     if (doVerify) {
628       doVerify(getConf(), htd);
629       if (doDelete) {
630         getTestingUtil(getConf()).deleteTable(htd.getTableName());
631       }
632     }
633     if (doSearch) {
634       return doSearch(getConf(), keysDir);
635     }
636     return 0;
637   }
638 
639   @Override
getTablename()640   public TableName getTablename() {
641     return TableName.valueOf(getConf().get(TABLE_NAME_KEY, TEST_NAME));
642   }
643 
644   @Override
getColumnFamilies()645   protected Set<String> getColumnFamilies() {
646     return Sets.newHashSet(Bytes.toString(TEST_FAMILY));
647   }
648 
main(String argv[])649   public static void main(String argv[]) throws Exception {
650     Configuration conf = HBaseConfiguration.create();
651     IntegrationTestingUtility.setUseDistributedCluster(conf);
652     int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv);
653     System.exit(ret);
654   }
655 }
656