1 /*
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.client;
20 
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25 
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ThreadPoolExecutor;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionLocation;
40 import org.apache.hadoop.hbase.testclassification.MediumTests;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.Waiter;
44 import org.apache.hadoop.hbase.codec.KeyValueCodec;
45 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.JVMClusterUtil;
48 import org.apache.hadoop.hbase.util.Threads;
49 import org.junit.AfterClass;
50 import org.junit.Assert;
51 import org.junit.Before;
52 import org.junit.BeforeClass;
53 import org.junit.Ignore;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 
57 @Category(MediumTests.class)
58 public class TestMultiParallel {
59   private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
60 
61   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
62   private static final byte[] VALUE = Bytes.toBytes("value");
63   private static final byte[] QUALIFIER = Bytes.toBytes("qual");
64   private static final String FAMILY = "family";
65   private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
66   private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
67   private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
68   private static final byte [][] KEYS = makeKeys();
69 
70   private static final int slaves = 5; // also used for testing HTable pool size
71   private static Connection CONNECTION;
72 
beforeClass()73   @BeforeClass public static void beforeClass() throws Exception {
74     // Uncomment the following lines if more verbosity is needed for
75     // debugging (see HBASE-12285 for details).
76     //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
77     //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
78     //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
79     UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
80         KeyValueCodec.class.getCanonicalName());
81     UTIL.startMiniCluster(slaves);
82     HTable t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
83     UTIL.waitTableEnabled(TEST_TABLE);
84     t.close();
85     CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
86   }
87 
afterClass()88   @AfterClass public static void afterClass() throws Exception {
89     CONNECTION.close();
90     UTIL.shutdownMiniCluster();
91   }
92 
before()93   @Before public void before() throws Exception {
94     LOG.info("before");
95     if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
96       // Distribute regions
97       UTIL.getMiniHBaseCluster().getMaster().balance();
98 
99       // Wait until completing balance
100       UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
101     }
102     LOG.info("before done");
103   }
104 
makeKeys()105   private static byte[][] makeKeys() {
106     byte [][] starterKeys = HBaseTestingUtility.KEYS;
107     // Create a "non-uniform" test set with the following characteristics:
108     // a) Unequal number of keys per region
109 
110     // Don't use integer as a multiple, so that we have a number of keys that is
111     // not a multiple of the number of regions
112     int numKeys = (int) ((float) starterKeys.length * 10.33F);
113 
114     List<byte[]> keys = new ArrayList<byte[]>();
115     for (int i = 0; i < numKeys; i++) {
116       int kIdx = i % starterKeys.length;
117       byte[] k = starterKeys[kIdx];
118       byte[] cp = new byte[k.length + 1];
119       System.arraycopy(k, 0, cp, 0, k.length);
120       cp[k.length] = new Integer(i % 256).byteValue();
121       keys.add(cp);
122     }
123 
124     // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
125     // should work)
126     // c) keys are not in sorted order (within a region), to ensure that the
127     // sorting code and index mapping doesn't break the functionality
128     for (int i = 0; i < 100; i++) {
129       int kIdx = i % starterKeys.length;
130       byte[] k = starterKeys[kIdx];
131       byte[] cp = new byte[k.length + 1];
132       System.arraycopy(k, 0, cp, 0, k.length);
133       cp[k.length] = new Integer(i % 256).byteValue();
134       keys.add(cp);
135     }
136     return keys.toArray(new byte [][] {new byte [] {}});
137   }
138 
139 
140   /**
141    * This is for testing the active number of threads that were used while
142    * doing a batch operation. It inserts one row per region via the batch
143    * operation, and then checks the number of active threads.
144    * For HBASE-3553
145    * @throws IOException
146    * @throws InterruptedException
147    * @throws NoSuchFieldException
148    * @throws SecurityException
149    */
150   @Ignore ("Nice bug flakey... expected 5 but was 4..") @Test(timeout=300000)
testActiveThreadsCount()151   public void testActiveThreadsCount() throws Exception {
152     try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) {
153       ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
154       try {
155         try (Table t = connection.getTable(TEST_TABLE, executor)) {
156           List<Put> puts = constructPutRequests(); // creates a Put for every region
157           t.batch(puts);
158           HashSet<ServerName> regionservers = new HashSet<ServerName>();
159           try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
160             for (Row r : puts) {
161               HRegionLocation location = locator.getRegionLocation(r.getRow());
162               regionservers.add(location.getServerName());
163             }
164           }
165           assertEquals(regionservers.size(), executor.getLargestPoolSize());
166         }
167       } finally {
168         executor.shutdownNow();
169       }
170     }
171   }
172 
173   @Test(timeout=300000)
testBatchWithGet()174   public void testBatchWithGet() throws Exception {
175     LOG.info("test=testBatchWithGet");
176     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
177 
178     // load test data
179     List<Put> puts = constructPutRequests();
180     table.batch(puts);
181 
182     // create a list of gets and run it
183     List<Row> gets = new ArrayList<Row>();
184     for (byte[] k : KEYS) {
185       Get get = new Get(k);
186       get.addColumn(BYTES_FAMILY, QUALIFIER);
187       gets.add(get);
188     }
189     Result[] multiRes = new Result[gets.size()];
190     table.batch(gets, multiRes);
191 
192     // Same gets using individual call API
193     List<Result> singleRes = new ArrayList<Result>();
194     for (Row get : gets) {
195       singleRes.add(table.get((Get) get));
196     }
197     // Compare results
198     Assert.assertEquals(singleRes.size(), multiRes.length);
199     for (int i = 0; i < singleRes.size(); i++) {
200       Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
201       Cell[] singleKvs = singleRes.get(i).rawCells();
202       Cell[] multiKvs = multiRes[i].rawCells();
203       for (int j = 0; j < singleKvs.length; j++) {
204         Assert.assertEquals(singleKvs[j], multiKvs[j]);
205         Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
206             CellUtil.cloneValue(multiKvs[j])));
207       }
208     }
209     table.close();
210   }
211 
212   @Test
testBadFam()213   public void testBadFam() throws Exception {
214     LOG.info("test=testBadFam");
215     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
216 
217     List<Row> actions = new ArrayList<Row>();
218     Put p = new Put(Bytes.toBytes("row1"));
219     p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
220     actions.add(p);
221     p = new Put(Bytes.toBytes("row2"));
222     p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
223     actions.add(p);
224 
225     // row1 and row2 should be in the same region.
226 
227     Object [] r = new Object[actions.size()];
228     try {
229       table.batch(actions, r);
230       fail();
231     } catch (RetriesExhaustedWithDetailsException ex) {
232       LOG.debug(ex);
233       // good!
234       assertFalse(ex.mayHaveClusterIssues());
235     }
236     assertEquals(2, r.length);
237     assertTrue(r[0] instanceof Throwable);
238     assertTrue(r[1] instanceof Result);
239     table.close();
240   }
241 
242   @Test (timeout=300000)
testFlushCommitsNoAbort()243   public void testFlushCommitsNoAbort() throws Exception {
244     LOG.info("test=testFlushCommitsNoAbort");
245     doTestFlushCommits(false);
246   }
247 
248   /**
249    * Only run one Multi test with a forced RegionServer abort. Otherwise, the
250    * unit tests will take an unnecessarily long time to run.
251    *
252    * @throws Exception
253    */
254   @Test (timeout=360000)
testFlushCommitsWithAbort()255   public void testFlushCommitsWithAbort() throws Exception {
256     LOG.info("test=testFlushCommitsWithAbort");
257     doTestFlushCommits(true);
258   }
259 
260   /**
261    * Set table auto flush to false and test flushing commits
262    * @param doAbort true if abort one regionserver in the testing
263    * @throws Exception
264    */
doTestFlushCommits(boolean doAbort)265   private void doTestFlushCommits(boolean doAbort) throws Exception {
266     // Load the data
267     LOG.info("get new table");
268     Table table = UTIL.getConnection().getTable(TEST_TABLE);
269     table.setWriteBufferSize(10 * 1024 * 1024);
270 
271     LOG.info("constructPutRequests");
272     List<Put> puts = constructPutRequests();
273     table.put(puts);
274     LOG.info("puts");
275     final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
276         .size();
277     assert liveRScount > 0;
278     JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
279         .getLiveRegionServerThreads().get(0);
280     if (doAbort) {
281       liveRS.getRegionServer().abort("Aborting for tests",
282           new Exception("doTestFlushCommits"));
283       // If we wait for no regions being online after we abort the server, we
284       // could ensure the master has re-assigned the regions on killed server
285       // after writing successfully. It means the server we aborted is dead
286       // and detected by matser
287       while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
288         Thread.sleep(100);
289       }
290       // try putting more keys after the abort. same key/qual... just validating
291       // no exceptions thrown
292       puts = constructPutRequests();
293       table.put(puts);
294     }
295 
296     LOG.info("validating loaded data");
297     validateLoadedData(table);
298 
299     // Validate server and region count
300     List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
301     int count = 0;
302     for (JVMClusterUtil.RegionServerThread t: liveRSs) {
303       count++;
304       LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
305     }
306     LOG.info("Count=" + count);
307     Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
308         (doAbort ? (liveRScount - 1) : liveRScount), count);
309     if (doAbort) {
310       UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
311       UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
312         @Override
313         public boolean evaluate() throws Exception {
314           return UTIL.getMiniHBaseCluster().getMaster()
315               .getClusterStatus().getServersSize() == (liveRScount - 1);
316         }
317       });
318       UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
319     }
320 
321     table.close();
322     LOG.info("done");
323   }
324 
325   @Test (timeout=300000)
testBatchWithPut()326   public void testBatchWithPut() throws Exception {
327     LOG.info("test=testBatchWithPut");
328     Table table = CONNECTION.getTable(TEST_TABLE);
329     // put multiple rows using a batch
330     List<Put> puts = constructPutRequests();
331 
332     Object[] results = table.batch(puts);
333     validateSizeAndEmpty(results, KEYS.length);
334 
335     if (true) {
336       int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
337       assert liveRScount > 0;
338       JVMClusterUtil.RegionServerThread liveRS =
339         UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
340       liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
341       puts = constructPutRequests();
342       try {
343         results = table.batch(puts);
344       } catch (RetriesExhaustedWithDetailsException ree) {
345         LOG.info(ree.getExhaustiveDescription());
346         table.close();
347         throw ree;
348       }
349       validateSizeAndEmpty(results, KEYS.length);
350     }
351 
352     validateLoadedData(table);
353     table.close();
354   }
355 
356   @Test(timeout=300000)
testBatchWithDelete()357   public void testBatchWithDelete() throws Exception {
358     LOG.info("test=testBatchWithDelete");
359     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
360 
361     // Load some data
362     List<Put> puts = constructPutRequests();
363     Object[] results = table.batch(puts);
364     validateSizeAndEmpty(results, KEYS.length);
365 
366     // Deletes
367     List<Row> deletes = new ArrayList<Row>();
368     for (int i = 0; i < KEYS.length; i++) {
369       Delete delete = new Delete(KEYS[i]);
370       delete.addFamily(BYTES_FAMILY);
371       deletes.add(delete);
372     }
373     results = table.batch(deletes);
374     validateSizeAndEmpty(results, KEYS.length);
375 
376     // Get to make sure ...
377     for (byte[] k : KEYS) {
378       Get get = new Get(k);
379       get.addColumn(BYTES_FAMILY, QUALIFIER);
380       Assert.assertFalse(table.exists(get));
381     }
382     table.close();
383   }
384 
385   @Test(timeout=300000)
testHTableDeleteWithList()386   public void testHTableDeleteWithList() throws Exception {
387     LOG.info("test=testHTableDeleteWithList");
388     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
389 
390     // Load some data
391     List<Put> puts = constructPutRequests();
392     Object[] results = table.batch(puts);
393     validateSizeAndEmpty(results, KEYS.length);
394 
395     // Deletes
396     ArrayList<Delete> deletes = new ArrayList<Delete>();
397     for (int i = 0; i < KEYS.length; i++) {
398       Delete delete = new Delete(KEYS[i]);
399       delete.deleteFamily(BYTES_FAMILY);
400       deletes.add(delete);
401     }
402     table.delete(deletes);
403     Assert.assertTrue(deletes.isEmpty());
404 
405     // Get to make sure ...
406     for (byte[] k : KEYS) {
407       Get get = new Get(k);
408       get.addColumn(BYTES_FAMILY, QUALIFIER);
409       Assert.assertFalse(table.exists(get));
410     }
411     table.close();
412   }
413 
414   @Test(timeout=300000)
testBatchWithManyColsInOneRowGetAndPut()415   public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
416     LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
417     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
418 
419     List<Row> puts = new ArrayList<Row>();
420     for (int i = 0; i < 100; i++) {
421       Put put = new Put(ONE_ROW);
422       byte[] qual = Bytes.toBytes("column" + i);
423       put.add(BYTES_FAMILY, qual, VALUE);
424       puts.add(put);
425     }
426     Object[] results = table.batch(puts);
427 
428     // validate
429     validateSizeAndEmpty(results, 100);
430 
431     // get the data back and validate that it is correct
432     List<Row> gets = new ArrayList<Row>();
433     for (int i = 0; i < 100; i++) {
434       Get get = new Get(ONE_ROW);
435       byte[] qual = Bytes.toBytes("column" + i);
436       get.addColumn(BYTES_FAMILY, qual);
437       gets.add(get);
438     }
439 
440     Object[] multiRes = table.batch(gets);
441 
442     int idx = 0;
443     for (Object r : multiRes) {
444       byte[] qual = Bytes.toBytes("column" + idx);
445       validateResult(r, qual, VALUE);
446       idx++;
447     }
448     table.close();
449   }
450 
451   @Test(timeout=300000)
testBatchWithIncrementAndAppend()452   public void testBatchWithIncrementAndAppend() throws Exception {
453     LOG.info("test=testBatchWithIncrementAndAppend");
454     final byte[] QUAL1 = Bytes.toBytes("qual1");
455     final byte[] QUAL2 = Bytes.toBytes("qual2");
456     final byte[] QUAL3 = Bytes.toBytes("qual3");
457     final byte[] QUAL4 = Bytes.toBytes("qual4");
458     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
459     Delete d = new Delete(ONE_ROW);
460     table.delete(d);
461     Put put = new Put(ONE_ROW);
462     put.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
463     put.add(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
464     table.put(put);
465 
466     Increment inc = new Increment(ONE_ROW);
467     inc.addColumn(BYTES_FAMILY, QUAL2, 1);
468     inc.addColumn(BYTES_FAMILY, QUAL3, 1);
469 
470     Append a = new Append(ONE_ROW);
471     a.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
472     a.add(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
473     List<Row> actions = new ArrayList<Row>();
474     actions.add(inc);
475     actions.add(a);
476 
477     Object[] multiRes = table.batch(actions);
478     validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
479     validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
480     validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
481     validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
482     table.close();
483   }
484 
485   @Test(timeout=300000)
testNonceCollision()486   public void testNonceCollision() throws Exception {
487     LOG.info("test=testNonceCollision");
488     final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
489     Table table = connection.getTable(TEST_TABLE);
490     Put put = new Put(ONE_ROW);
491     put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
492 
493     // Replace nonce manager with the one that returns each nonce twice.
494     NonceGenerator cnm = new PerClientRandomNonceGenerator() {
495       long lastNonce = -1;
496       @Override
497       public synchronized long newNonce() {
498         long nonce = 0;
499         if (lastNonce == -1) {
500           lastNonce = nonce = super.newNonce();
501         } else {
502           nonce = lastNonce;
503           lastNonce = -1L;
504         }
505         return nonce;
506       }
507     };
508 
509     NonceGenerator oldCnm =
510       ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
511 
512     // First test sequential requests.
513     try {
514       Increment inc = new Increment(ONE_ROW);
515       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
516       table.increment(inc);
517       inc = new Increment(ONE_ROW);
518       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
519       try {
520         table.increment(inc);
521         fail("Should have thrown an exception");
522       } catch (OperationConflictException ex) {
523       }
524       Get get = new Get(ONE_ROW);
525       get.addColumn(BYTES_FAMILY, QUALIFIER);
526       Result result = table.get(get);
527       validateResult(result, QUALIFIER, Bytes.toBytes(1L));
528 
529       // Now run a bunch of requests in parallel, exactly half should succeed.
530       int numRequests = 40;
531       final CountDownLatch startedLatch = new CountDownLatch(numRequests);
532       final CountDownLatch startLatch = new CountDownLatch(1);
533       final CountDownLatch doneLatch = new CountDownLatch(numRequests);
534       for (int i = 0; i < numRequests; ++i) {
535         Runnable r = new Runnable() {
536           @Override
537           public void run() {
538             Table table = null;
539             try {
540               table = connection.getTable(TEST_TABLE);
541             } catch (IOException e) {
542               fail("Not expected");
543             }
544             Increment inc = new Increment(ONE_ROW);
545             inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
546             startedLatch.countDown();
547             try {
548               startLatch.await();
549             } catch (InterruptedException e) {
550               fail("Not expected");
551             }
552             try {
553               table.increment(inc);
554             } catch (OperationConflictException ex) { // Some threads are expected to fail.
555             } catch (IOException ioEx) {
556               fail("Not expected");
557             }
558             doneLatch.countDown();
559           }
560         };
561         Threads.setDaemonThreadRunning(new Thread(r));
562       }
563       startedLatch.await(); // Wait until all threads are ready...
564       startLatch.countDown(); // ...and unleash the herd!
565       doneLatch.await();
566       // Now verify
567       get = new Get(ONE_ROW);
568       get.addColumn(BYTES_FAMILY, QUALIFIER);
569       result = table.get(get);
570       validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
571       table.close();
572     } finally {
573       ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm);
574     }
575   }
576 
577   @Test(timeout=300000)
testBatchWithMixedActions()578   public void testBatchWithMixedActions() throws Exception {
579     LOG.info("test=testBatchWithMixedActions");
580     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
581 
582     // Load some data to start
583     Object[] results = table.batch(constructPutRequests());
584     validateSizeAndEmpty(results, KEYS.length);
585 
586     // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
587     // put
588     List<Row> actions = new ArrayList<Row>();
589 
590     byte[] qual2 = Bytes.toBytes("qual2");
591     byte[] val2 = Bytes.toBytes("putvalue2");
592 
593     // 0 get
594     Get get = new Get(KEYS[10]);
595     get.addColumn(BYTES_FAMILY, QUALIFIER);
596     actions.add(get);
597 
598     // 1 get
599     get = new Get(KEYS[11]);
600     get.addColumn(BYTES_FAMILY, QUALIFIER);
601     actions.add(get);
602 
603     // 2 put of new column
604     Put put = new Put(KEYS[10]);
605     put.add(BYTES_FAMILY, qual2, val2);
606     actions.add(put);
607 
608     // 3 delete
609     Delete delete = new Delete(KEYS[20]);
610     delete.deleteFamily(BYTES_FAMILY);
611     actions.add(delete);
612 
613     // 4 get
614     get = new Get(KEYS[30]);
615     get.addColumn(BYTES_FAMILY, QUALIFIER);
616     actions.add(get);
617 
618     // There used to be a 'get' of a previous put here, but removed
619     // since this API really cannot guarantee order in terms of mixed
620     // get/puts.
621 
622     // 5 put of new column
623     put = new Put(KEYS[40]);
624     put.add(BYTES_FAMILY, qual2, val2);
625     actions.add(put);
626 
627     results = table.batch(actions);
628 
629     // Validation
630 
631     validateResult(results[0]);
632     validateResult(results[1]);
633     validateEmpty(results[2]);
634     validateEmpty(results[3]);
635     validateResult(results[4]);
636     validateEmpty(results[5]);
637 
638     // validate last put, externally from the batch
639     get = new Get(KEYS[40]);
640     get.addColumn(BYTES_FAMILY, qual2);
641     Result r = table.get(get);
642     validateResult(r, qual2, val2);
643 
644     table.close();
645   }
646 
647   // // Helper methods ////
648 
validateResult(Object r)649   private void validateResult(Object r) {
650     validateResult(r, QUALIFIER, VALUE);
651   }
652 
validateResult(Object r1, byte[] qual, byte[] val)653   private void validateResult(Object r1, byte[] qual, byte[] val) {
654     Result r = (Result)r1;
655     Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
656     byte[] value = r.getValue(BYTES_FAMILY, qual);
657     if (0 != Bytes.compareTo(val, value)) {
658       fail("Expected [" + Bytes.toStringBinary(val)
659           + "] but got [" + Bytes.toStringBinary(value) + "]");
660     }
661   }
662 
constructPutRequests()663   private List<Put> constructPutRequests() {
664     List<Put> puts = new ArrayList<>();
665     for (byte[] k : KEYS) {
666       Put put = new Put(k);
667       put.add(BYTES_FAMILY, QUALIFIER, VALUE);
668       puts.add(put);
669     }
670     return puts;
671   }
672 
validateLoadedData(Table table)673   private void validateLoadedData(Table table) throws IOException {
674     // get the data back and validate that it is correct
675     LOG.info("Validating data on " + table);
676     List<Get> gets = new ArrayList<Get>();
677     for (byte[] k : KEYS) {
678       Get get = new Get(k);
679       get.addColumn(BYTES_FAMILY, QUALIFIER);
680       gets.add(get);
681     }
682     int retryNum = 10;
683     Result[] results = null;
684     do  {
685       results = table.get(gets);
686       boolean finished = true;
687       for (Result result : results) {
688         if (result.isEmpty()) {
689           finished = false;
690           break;
691         }
692       }
693       if (finished) {
694         break;
695       }
696       try {
697         Thread.sleep(10);
698       } catch (InterruptedException e) {
699       }
700       retryNum--;
701     } while (retryNum > 0);
702 
703     if (retryNum == 0) {
704       fail("Timeout for validate data");
705     } else {
706       if (results != null) {
707         for (Result r : results) {
708           Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
709           Assert.assertEquals(0, Bytes.compareTo(VALUE, r
710             .getValue(BYTES_FAMILY, QUALIFIER)));
711         }
712         LOG.info("Validating data on " + table + " successfully!");
713       }
714     }
715   }
716 
validateEmpty(Object r1)717   private void validateEmpty(Object r1) {
718     Result result = (Result)r1;
719     Assert.assertTrue(result != null);
720     Assert.assertTrue(result.getRow() == null);
721     Assert.assertEquals(0, result.rawCells().length);
722   }
723 
validateSizeAndEmpty(Object[] results, int expectedSize)724   private void validateSizeAndEmpty(Object[] results, int expectedSize) {
725     // Validate got back the same number of Result objects, all empty
726     Assert.assertEquals(expectedSize, results.length);
727     for (Object result : results) {
728       validateEmpty(result);
729     }
730   }
731 }
732