1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * <p/>
10 * http://www.apache.org/licenses/LICENSE-2.0
11 * <p/>
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.test;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import com.google.common.collect.Sets;
24
25 import org.apache.commons.cli.CommandLine;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileStatus;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.LocatedFileStatus;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.fs.RemoteIterator;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.IntegrationTestBase;
40 import org.apache.hadoop.hbase.IntegrationTestingUtility;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
45 import org.apache.hadoop.hbase.TableName;
46 import org.apache.hadoop.hbase.client.Admin;
47 import org.apache.hadoop.hbase.client.BufferedMutator;
48 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
49 import org.apache.hadoop.hbase.client.Mutation;
50 import org.apache.hadoop.hbase.client.Put;
51 import org.apache.hadoop.hbase.client.Result;
52 import org.apache.hadoop.hbase.client.Scan;
53 import org.apache.hadoop.hbase.client.ScannerCallable;
54 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
55 import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
56 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
57 import org.apache.hadoop.hbase.mapreduce.TableMapper;
58 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
59 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
60 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.wal.WALKey;
63 import org.apache.hadoop.io.BytesWritable;
64 import org.apache.hadoop.io.NullWritable;
65 import org.apache.hadoop.io.Text;
66 import org.apache.hadoop.mapreduce.Counter;
67 import org.apache.hadoop.mapreduce.Job;
68 import org.apache.hadoop.mapreduce.Mapper;
69 import org.apache.hadoop.mapreduce.Reducer;
70 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
71 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
72 import org.apache.hadoop.util.ToolRunner;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75
76 import java.io.BufferedReader;
77 import java.io.FileNotFoundException;
78 import java.io.IOException;
79 import java.io.InputStream;
80 import java.io.InputStreamReader;
81 import java.io.InterruptedIOException;
82 import java.util.Random;
83 import java.util.Set;
84 import java.util.SortedSet;
85 import java.util.TreeSet;
86 import java.util.concurrent.atomic.AtomicInteger;
87 import java.util.regex.Matcher;
88 import java.util.regex.Pattern;
89
90 /**
91 * A large test which loads a lot of data that has internal references, and
92 * verifies the data.
93 *
94 * In load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write
95 * (default 100K) rows to an hbase table. Rows are written in blocks, for a total of
96 * 100 blocks. Each row in a block, contains loadmapper.backrefs (default 50) references
97 * to random rows in the prev block.
98 *
99 * Verify step is scans the table, and verifies that for every referenced row, the row is
100 * actually there (no data loss). Failed rows are output from reduce to be saved in the
101 * job output dir in hdfs and inspected later.
102 *
103 * This class can be run as a unit test, as an integration test, or from the command line
104 *
105 * Originally taken from Apache Bigtop.
106 */
107 @Category(IntegrationTests.class)
108 public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
109
110 private static final Log LOG = LogFactory.getLog(IntegrationTestLoadAndVerify.class);
111
112 private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
113 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
114 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
115
116 private static final String NUM_TO_WRITE_KEY =
117 "loadmapper.num_to_write";
118 private static final long NUM_TO_WRITE_DEFAULT = 100*1000;
119
120 private static final String TABLE_NAME_KEY = "loadmapper.table";
121 private static final String TABLE_NAME_DEFAULT = "table";
122
123 private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
124 private static final int NUM_BACKREFS_DEFAULT = 50;
125
126 private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks";
127 private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks";
128 private static final int NUM_MAP_TASKS_DEFAULT = 200;
129 private static final int NUM_REDUCE_TASKS_DEFAULT = 35;
130
131 private static final int SCANNER_CACHING = 500;
132
133 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
134
135 private String toRun = null;
136 private String keysDir = null;
137
138 private enum Counters {
139 ROWS_WRITTEN,
140 REFERENCES_WRITTEN,
141 REFERENCES_CHECKED
142 }
143
144 @Override
setUpCluster()145 public void setUpCluster() throws Exception {
146 util = getTestingUtil(getConf());
147 util.initializeCluster(3);
148 this.setConf(util.getConfiguration());
149 if (!util.isDistributedCluster()) {
150 getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100);
151 getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100);
152 getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10);
153 util.startMiniMapReduceCluster();
154 }
155 }
156
157 @Override
cleanUpCluster()158 public void cleanUpCluster() throws Exception {
159 super.cleanUpCluster();
160 if (!util.isDistributedCluster()) {
161 util.shutdownMiniMapReduceCluster();
162 }
163 }
164
165 /**
166 * Converts a "long" value between endian systems.
167 * Borrowed from Apache Commons IO
168 * @param value value to convert
169 * @return the converted value
170 */
swapLong(long value)171 public static long swapLong(long value)
172 {
173 return
174 ( ( ( value >> 0 ) & 0xff ) << 56 ) +
175 ( ( ( value >> 8 ) & 0xff ) << 48 ) +
176 ( ( ( value >> 16 ) & 0xff ) << 40 ) +
177 ( ( ( value >> 24 ) & 0xff ) << 32 ) +
178 ( ( ( value >> 32 ) & 0xff ) << 24 ) +
179 ( ( ( value >> 40 ) & 0xff ) << 16 ) +
180 ( ( ( value >> 48 ) & 0xff ) << 8 ) +
181 ( ( ( value >> 56 ) & 0xff ) << 0 );
182 }
183
184 public static class LoadMapper
185 extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
186 {
187 protected long recordsToWrite;
188 protected Connection connection;
189 protected BufferedMutator mutator;
190 protected Configuration conf;
191 protected int numBackReferencesPerRow;
192 protected String shortTaskId;
193
194 protected Random rand = new Random();
195
196 protected Counter rowsWritten, refsWritten;
197
198 @Override
setup(Context context)199 public void setup(Context context) throws IOException {
200 conf = context.getConfiguration();
201 recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
202 String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
203 numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
204 this.connection = ConnectionFactory.createConnection(conf);
205 mutator = connection.getBufferedMutator(
206 new BufferedMutatorParams(TableName.valueOf(tableName))
207 .writeBufferSize(4 * 1024 * 1024));
208
209 String taskId = conf.get("mapreduce.task.attempt.id");
210 Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
211 if (!matcher.matches()) {
212 throw new RuntimeException("Strange task ID: " + taskId);
213 }
214 shortTaskId = matcher.group(1);
215
216 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
217 refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
218 }
219
220 @Override
cleanup(Context context)221 public void cleanup(Context context) throws IOException {
222 mutator.close();
223 connection.close();
224 }
225
226 @Override
map(NullWritable key, NullWritable value, Context context)227 protected void map(NullWritable key, NullWritable value,
228 Context context) throws IOException, InterruptedException {
229
230 String suffix = "/" + shortTaskId;
231 byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
232
233 int BLOCK_SIZE = (int)(recordsToWrite / 100);
234
235 for (long i = 0; i < recordsToWrite;) {
236 long blockStart = i;
237 for (long idxInBlock = 0;
238 idxInBlock < BLOCK_SIZE && i < recordsToWrite;
239 idxInBlock++, i++) {
240
241 long byteSwapped = swapLong(i);
242 Bytes.putLong(row, 0, byteSwapped);
243
244 Put p = new Put(row);
245 p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
246 if (blockStart > 0) {
247 for (int j = 0; j < numBackReferencesPerRow; j++) {
248 long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
249 Bytes.putLong(row, 0, swapLong(referredRow));
250 p.add(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
251 }
252 refsWritten.increment(1);
253 }
254 rowsWritten.increment(1);
255 mutator.mutate(p);
256
257 if (i % 100 == 0) {
258 context.setStatus("Written " + i + "/" + recordsToWrite + " records");
259 context.progress();
260 }
261 }
262 // End of block, flush all of them before we start writing anything
263 // pointing to these!
264 mutator.flush();
265 }
266 }
267 }
268
269 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
270 static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
271
272
273 @Override
map(ImmutableBytesWritable key, Result value, Context context)274 protected void map(ImmutableBytesWritable key, Result value, Context context)
275 throws IOException, InterruptedException {
276 BytesWritable bwKey = new BytesWritable(key.get());
277 BytesWritable bwVal = new BytesWritable();
278 for (Cell kv : value.listCells()) {
279 if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length,
280 kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) {
281 context.write(bwKey, EMPTY);
282 } else {
283 bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
284 context.write(bwVal, bwKey);
285 }
286 }
287 }
288 }
289
290 public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
291 private Counter refsChecked;
292 private Counter rowsWritten;
293
294 @Override
setup(Context context)295 public void setup(Context context) throws IOException {
296 refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
297 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
298 }
299
300 @Override
reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers, VerifyReducer.Context ctx)301 protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
302 VerifyReducer.Context ctx) throws IOException, InterruptedException {
303 boolean gotOriginalRow = false;
304 int refCount = 0;
305
306 for (BytesWritable ref : referrers) {
307 if (ref.getLength() == 0) {
308 assert !gotOriginalRow;
309 gotOriginalRow = true;
310 } else {
311 refCount++;
312 }
313 }
314 refsChecked.increment(refCount);
315
316 if (!gotOriginalRow) {
317 String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
318 String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
319 LOG.error("Reference error row " + parsedRow);
320 ctx.write(new Text(binRow), new Text(parsedRow));
321 rowsWritten.increment(1);
322 }
323 }
324
makeRowReadable(byte[] bytes, int length)325 private String makeRowReadable(byte[] bytes, int length) {
326 long rowIdx = swapLong(Bytes.toLong(bytes, 0));
327 String suffix = Bytes.toString(bytes, 8, length - 8);
328
329 return "Row #" + rowIdx + " suffix " + suffix;
330 }
331 }
332
doLoad(Configuration conf, HTableDescriptor htd)333 protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
334 Path outputDir = getTestDir(TEST_NAME, "load-output");
335 LOG.info("Load output dir: " + outputDir);
336
337 NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
338 conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
339
340 Job job = Job.getInstance(conf);
341 job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
342 job.setJarByClass(this.getClass());
343 setMapperClass(job);
344 job.setInputFormatClass(NMapInputFormat.class);
345 job.setNumReduceTasks(0);
346 setJobScannerConf(job);
347 FileOutputFormat.setOutputPath(job, outputDir);
348
349 TableMapReduceUtil.addDependencyJars(job);
350
351 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
352 TableMapReduceUtil.initCredentials(job);
353 assertTrue(job.waitForCompletion(true));
354 return job;
355 }
356
setMapperClass(Job job)357 protected void setMapperClass(Job job) {
358 job.setMapperClass(LoadMapper.class);
359 }
360
doVerify(Configuration conf, HTableDescriptor htd)361 protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
362 Path outputDir = getTestDir(TEST_NAME, "verify-output");
363 LOG.info("Verify output dir: " + outputDir);
364
365 Job job = Job.getInstance(conf);
366 job.setJarByClass(this.getClass());
367 job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
368 setJobScannerConf(job);
369
370 Scan scan = new Scan();
371
372 TableMapReduceUtil.initTableMapperJob(
373 htd.getTableName().getNameAsString(), scan, VerifyMapper.class,
374 BytesWritable.class, BytesWritable.class, job);
375 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
376 int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
377 TableMapReduceUtil.setScannerCaching(job, scannerCaching);
378
379 job.setReducerClass(VerifyReducer.class);
380 job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
381 FileOutputFormat.setOutputPath(job, outputDir);
382 assertTrue(job.waitForCompletion(true));
383
384 long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
385 assertEquals(0, numOutputRecords);
386 }
387
388 /**
389 * Tool to search missing rows in WALs and hfiles.
390 * Pass in file or dir of keys to search for. Key file must have been written by Verify step
391 * (we depend on the format it writes out. We'll read them in and then search in hbase
392 * WALs and oldWALs dirs (Some of this is TODO).
393 */
394 public static class WALSearcher extends WALPlayer {
WALSearcher(Configuration conf)395 public WALSearcher(Configuration conf) {
396 super(conf);
397 }
398
399 /**
400 * The actual searcher mapper.
401 */
402 public static class WALMapperSearcher extends WALMapper {
403 private SortedSet<byte []> keysToFind;
404 private AtomicInteger rows = new AtomicInteger(0);
405
406 @Override
setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)407 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
408 throws IOException {
409 super.setup(context);
410 try {
411 this.keysToFind = readKeysToSearch(context.getConfiguration());
412 LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
413 } catch (InterruptedException e) {
414 throw new InterruptedIOException(e.toString());
415 }
416 }
417
418 @Override
filter(Context context, Cell cell)419 protected boolean filter(Context context, Cell cell) {
420 // TODO: Can I do a better compare than this copying out key?
421 byte [] row = new byte [cell.getRowLength()];
422 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
423 boolean b = this.keysToFind.contains(row);
424 if (b) {
425 String keyStr = Bytes.toStringBinary(row);
426 try {
427 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
428 } catch (IOException|InterruptedException e) {
429 LOG.warn(e);
430 }
431 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
432 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
433 }
434 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
435 }
436 return b;
437 }
438 }
439
440 // Put in place the above WALMapperSearcher.
441 @Override
createSubmittableJob(String[] args)442 public Job createSubmittableJob(String[] args) throws IOException {
443 Job job = super.createSubmittableJob(args);
444 // Call my class instead.
445 job.setJarByClass(WALMapperSearcher.class);
446 job.setMapperClass(WALMapperSearcher.class);
447 job.setOutputFormatClass(NullOutputFormat.class);
448 return job;
449 }
450 }
451
452 static final String FOUND_GROUP_KEY = "Found";
453 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
454
readKeysToSearch(final Configuration conf)455 static SortedSet<byte []> readKeysToSearch(final Configuration conf)
456 throws IOException, InterruptedException {
457 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
458 FileSystem fs = FileSystem.get(conf);
459 SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
460 if (!fs.exists(keysInputDir)) {
461 throw new FileNotFoundException(keysInputDir.toString());
462 }
463 if (!fs.isDirectory(keysInputDir)) {
464 FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
465 readFileToSearch(conf, fs, keyFileStatus, result);
466 } else {
467 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
468 while(iterator.hasNext()) {
469 LocatedFileStatus keyFileStatus = iterator.next();
470 // Skip "_SUCCESS" file.
471 if (keyFileStatus.getPath().getName().startsWith("_")) continue;
472 readFileToSearch(conf, fs, keyFileStatus, result);
473 }
474 }
475 return result;
476 }
477
readFileToSearch(final Configuration conf, final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)478 private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
479 final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)
480 throws IOException,
481 InterruptedException {
482 // verify uses file output format and writes <Text, Text>. We can read it as a text file
483 try (InputStream in = fs.open(keyFileStatus.getPath());
484 BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
485 // extract out the key and return that missing as a missing key
486 String line;
487 while ((line = reader.readLine()) != null) {
488 if (line.isEmpty()) continue;
489
490 String[] parts = line.split("\\s+");
491 if (parts.length >= 1) {
492 String key = parts[0];
493 result.add(Bytes.toBytesBinary(key));
494 } else {
495 LOG.info("Cannot parse key from: " + line);
496 }
497 }
498 }
499 return result;
500 }
501
doSearch(Configuration conf, String keysDir)502 private int doSearch(Configuration conf, String keysDir) throws Exception {
503 Path inputDir = new Path(keysDir);
504
505 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
506 SortedSet<byte []> keys = readKeysToSearch(getConf());
507 if (keys.isEmpty()) throw new RuntimeException("No keys to find");
508 LOG.info("Count of keys to find: " + keys.size());
509 for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
510 Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
511 // Now read all WALs. In two dirs. Presumes certain layout.
512 Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
513 Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
514 LOG.info("Running Search with keys inputDir=" + inputDir +
515 " against " + getConf().get(HConstants.HBASE_DIR));
516 int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
517 if (ret != 0) return ret;
518 return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
519 }
520
setJobScannerConf(Job job)521 private static void setJobScannerConf(Job job) {
522 // Make sure scanners log something useful to make debugging possible.
523 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
524 long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100;
525 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int)lpr);
526 }
527
getTestDir(String testName, String subdir)528 public Path getTestDir(String testName, String subdir) throws IOException {
529 Path testDir = util.getDataTestDirOnTestFS(testName);
530 FileSystem fs = FileSystem.get(getConf());
531 fs.deleteOnExit(testDir);
532
533 return new Path(new Path(testDir, testName), subdir);
534 }
535
536 @Test
testLoadAndVerify()537 public void testLoadAndVerify() throws Exception {
538 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME));
539 htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
540
541 Admin admin = getTestingUtil(getConf()).getHBaseAdmin();
542 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40);
543
544 doLoad(getConf(), htd);
545 doVerify(getConf(), htd);
546
547 // Only disable and drop if we succeeded to verify - otherwise it's useful
548 // to leave it around for post-mortem
549 getTestingUtil(getConf()).deleteTable(htd.getTableName());
550 }
551
usage()552 public void usage() {
553 System.err.println(this.getClass().getSimpleName()
554 + " [-Doptions] <load|verify|loadAndVerify|search>");
555 System.err.println(" Loads a table with row dependencies and verifies the dependency chains");
556 System.err.println("Options");
557 System.err.println(" -Dloadmapper.table=<name> Table to write/verify (default autogen)");
558 System.err.println(" -Dloadmapper.backrefs=<n> Number of backreferences per row (default 50)");
559 System.err.println(" -Dloadmapper.num_to_write=<n> Number of rows per mapper (default 100,000 per mapper)");
560 System.err.println(" -Dloadmapper.deleteAfter=<bool> Delete after a successful verify (default true)");
561 System.err.println(" -Dloadmapper.numPresplits=<n> Number of presplit regions to start with (default 40)");
562 System.err.println(" -Dloadmapper.map.tasks=<n> Number of map tasks for load (default 200)");
563 System.err.println(" -Dverify.reduce.tasks=<n> Number of reduce tasks for verify (default 35)");
564 System.err.println(" -Dverify.scannercaching=<n> Number hbase scanner caching rows to read (default 50)");
565 }
566
567
568 @Override
processOptions(CommandLine cmd)569 protected void processOptions(CommandLine cmd) {
570 super.processOptions(cmd);
571
572 String[] args = cmd.getArgs();
573 if (args == null || args.length < 1) {
574 usage();
575 throw new RuntimeException("Incorrect Number of args.");
576 }
577 toRun = args[0];
578 if (toRun.equalsIgnoreCase("search")) {
579 if (args.length > 1) {
580 keysDir = args[1];
581 }
582 }
583 }
584
585 @Override
runTestFromCommandLine()586 public int runTestFromCommandLine() throws Exception {
587 IntegrationTestingUtility.setUseDistributedCluster(getConf());
588 boolean doLoad = false;
589 boolean doVerify = false;
590 boolean doSearch = false;
591 boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
592 int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
593
594 if (toRun.equalsIgnoreCase("load")) {
595 doLoad = true;
596 } else if (toRun.equalsIgnoreCase("verify")) {
597 doVerify= true;
598 } else if (toRun.equalsIgnoreCase("loadAndVerify")) {
599 doLoad=true;
600 doVerify= true;
601 } else if (toRun.equalsIgnoreCase("search")) {
602 doLoad=false;
603 doVerify= false;
604 doSearch = true;
605 if (keysDir == null) {
606 System.err.println("Usage: search <KEYS_DIR>]");
607 return 1;
608 }
609 } else {
610 System.err.println("Invalid argument " + toRun);
611 usage();
612 return 1;
613 }
614
615 // create HTableDescriptor for specified table
616 TableName table = getTablename();
617 HTableDescriptor htd = new HTableDescriptor(table);
618 htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
619
620 if (doLoad) {
621 try (Connection conn = ConnectionFactory.createConnection(getConf());
622 Admin admin = conn.getAdmin()) {
623 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
624 doLoad(getConf(), htd);
625 }
626 }
627 if (doVerify) {
628 doVerify(getConf(), htd);
629 if (doDelete) {
630 getTestingUtil(getConf()).deleteTable(htd.getTableName());
631 }
632 }
633 if (doSearch) {
634 return doSearch(getConf(), keysDir);
635 }
636 return 0;
637 }
638
639 @Override
getTablename()640 public TableName getTablename() {
641 return TableName.valueOf(getConf().get(TABLE_NAME_KEY, TEST_NAME));
642 }
643
644 @Override
getColumnFamilies()645 protected Set<String> getColumnFamilies() {
646 return Sets.newHashSet(Bytes.toString(TEST_FAMILY));
647 }
648
main(String argv[])649 public static void main(String argv[]) throws Exception {
650 Configuration conf = HBaseConfiguration.create();
651 IntegrationTestingUtility.setUseDistributedCluster(conf);
652 int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv);
653 System.exit(ret);
654 }
655 }
656