1 /* 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.client; 20 21 import static org.junit.Assert.assertEquals; 22 import static org.junit.Assert.assertFalse; 23 import static org.junit.Assert.assertTrue; 24 import static org.junit.Assert.fail; 25 26 import java.io.IOException; 27 import java.util.ArrayList; 28 import java.util.HashSet; 29 import java.util.List; 30 import java.util.concurrent.CountDownLatch; 31 import java.util.concurrent.ThreadPoolExecutor; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.hbase.Cell; 36 import org.apache.hadoop.hbase.CellUtil; 37 import org.apache.hadoop.hbase.HBaseTestingUtility; 38 import org.apache.hadoop.hbase.HConstants; 39 import org.apache.hadoop.hbase.HRegionLocation; 40 import org.apache.hadoop.hbase.testclassification.MediumTests; 41 import org.apache.hadoop.hbase.ServerName; 42 import org.apache.hadoop.hbase.TableName; 43 import org.apache.hadoop.hbase.Waiter; 44 import org.apache.hadoop.hbase.codec.KeyValueCodec; 45 import org.apache.hadoop.hbase.exceptions.OperationConflictException; 46 import org.apache.hadoop.hbase.util.Bytes; 47 import org.apache.hadoop.hbase.util.JVMClusterUtil; 48 import org.apache.hadoop.hbase.util.Threads; 49 import org.junit.AfterClass; 50 import org.junit.Assert; 51 import org.junit.Before; 52 import org.junit.BeforeClass; 53 import org.junit.Ignore; 54 import org.junit.Test; 55 import org.junit.experimental.categories.Category; 56 57 @Category(MediumTests.class) 58 public class TestMultiParallel { 59 private static final Log LOG = LogFactory.getLog(TestMultiParallel.class); 60 61 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 62 private static final byte[] VALUE = Bytes.toBytes("value"); 63 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 64 private static final String FAMILY = "family"; 65 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); 66 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); 67 private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); 68 private static final byte [][] KEYS = makeKeys(); 69 70 private static final int slaves = 5; // also used for testing HTable pool size 71 private static Connection CONNECTION; 72 beforeClass()73 @BeforeClass public static void beforeClass() throws Exception { 74 // Uncomment the following lines if more verbosity is needed for 75 // debugging (see HBASE-12285 for details). 76 //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); 77 //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); 78 //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); 79 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 80 KeyValueCodec.class.getCanonicalName()); 81 UTIL.startMiniCluster(slaves); 82 HTable t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); 83 UTIL.waitTableEnabled(TEST_TABLE); 84 t.close(); 85 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 86 } 87 afterClass()88 @AfterClass public static void afterClass() throws Exception { 89 CONNECTION.close(); 90 UTIL.shutdownMiniCluster(); 91 } 92 before()93 @Before public void before() throws Exception { 94 LOG.info("before"); 95 if (UTIL.ensureSomeRegionServersAvailable(slaves)) { 96 // Distribute regions 97 UTIL.getMiniHBaseCluster().getMaster().balance(); 98 99 // Wait until completing balance 100 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); 101 } 102 LOG.info("before done"); 103 } 104 makeKeys()105 private static byte[][] makeKeys() { 106 byte [][] starterKeys = HBaseTestingUtility.KEYS; 107 // Create a "non-uniform" test set with the following characteristics: 108 // a) Unequal number of keys per region 109 110 // Don't use integer as a multiple, so that we have a number of keys that is 111 // not a multiple of the number of regions 112 int numKeys = (int) ((float) starterKeys.length * 10.33F); 113 114 List<byte[]> keys = new ArrayList<byte[]>(); 115 for (int i = 0; i < numKeys; i++) { 116 int kIdx = i % starterKeys.length; 117 byte[] k = starterKeys[kIdx]; 118 byte[] cp = new byte[k.length + 1]; 119 System.arraycopy(k, 0, cp, 0, k.length); 120 cp[k.length] = new Integer(i % 256).byteValue(); 121 keys.add(cp); 122 } 123 124 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which 125 // should work) 126 // c) keys are not in sorted order (within a region), to ensure that the 127 // sorting code and index mapping doesn't break the functionality 128 for (int i = 0; i < 100; i++) { 129 int kIdx = i % starterKeys.length; 130 byte[] k = starterKeys[kIdx]; 131 byte[] cp = new byte[k.length + 1]; 132 System.arraycopy(k, 0, cp, 0, k.length); 133 cp[k.length] = new Integer(i % 256).byteValue(); 134 keys.add(cp); 135 } 136 return keys.toArray(new byte [][] {new byte [] {}}); 137 } 138 139 140 /** 141 * This is for testing the active number of threads that were used while 142 * doing a batch operation. It inserts one row per region via the batch 143 * operation, and then checks the number of active threads. 144 * For HBASE-3553 145 * @throws IOException 146 * @throws InterruptedException 147 * @throws NoSuchFieldException 148 * @throws SecurityException 149 */ 150 @Ignore ("Nice bug flakey... expected 5 but was 4..") @Test(timeout=300000) testActiveThreadsCount()151 public void testActiveThreadsCount() throws Exception { 152 try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) { 153 ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); 154 try { 155 try (Table t = connection.getTable(TEST_TABLE, executor)) { 156 List<Put> puts = constructPutRequests(); // creates a Put for every region 157 t.batch(puts); 158 HashSet<ServerName> regionservers = new HashSet<ServerName>(); 159 try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { 160 for (Row r : puts) { 161 HRegionLocation location = locator.getRegionLocation(r.getRow()); 162 regionservers.add(location.getServerName()); 163 } 164 } 165 assertEquals(regionservers.size(), executor.getLargestPoolSize()); 166 } 167 } finally { 168 executor.shutdownNow(); 169 } 170 } 171 } 172 173 @Test(timeout=300000) testBatchWithGet()174 public void testBatchWithGet() throws Exception { 175 LOG.info("test=testBatchWithGet"); 176 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); 177 178 // load test data 179 List<Put> puts = constructPutRequests(); 180 table.batch(puts); 181 182 // create a list of gets and run it 183 List<Row> gets = new ArrayList<Row>(); 184 for (byte[] k : KEYS) { 185 Get get = new Get(k); 186 get.addColumn(BYTES_FAMILY, QUALIFIER); 187 gets.add(get); 188 } 189 Result[] multiRes = new Result[gets.size()]; 190 table.batch(gets, multiRes); 191 192 // Same gets using individual call API 193 List<Result> singleRes = new ArrayList<Result>(); 194 for (Row get : gets) { 195 singleRes.add(table.get((Get) get)); 196 } 197 // Compare results 198 Assert.assertEquals(singleRes.size(), multiRes.length); 199 for (int i = 0; i < singleRes.size(); i++) { 200 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); 201 Cell[] singleKvs = singleRes.get(i).rawCells(); 202 Cell[] multiKvs = multiRes[i].rawCells(); 203 for (int j = 0; j < singleKvs.length; j++) { 204 Assert.assertEquals(singleKvs[j], multiKvs[j]); 205 Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), 206 CellUtil.cloneValue(multiKvs[j]))); 207 } 208 } 209 table.close(); 210 } 211 212 @Test testBadFam()213 public void testBadFam() throws Exception { 214 LOG.info("test=testBadFam"); 215 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); 216 217 List<Row> actions = new ArrayList<Row>(); 218 Put p = new Put(Bytes.toBytes("row1")); 219 p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value")); 220 actions.add(p); 221 p = new Put(Bytes.toBytes("row2")); 222 p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value")); 223 actions.add(p); 224 225 // row1 and row2 should be in the same region. 226 227 Object [] r = new Object[actions.size()]; 228 try { 229 table.batch(actions, r); 230 fail(); 231 } catch (RetriesExhaustedWithDetailsException ex) { 232 LOG.debug(ex); 233 // good! 234 assertFalse(ex.mayHaveClusterIssues()); 235 } 236 assertEquals(2, r.length); 237 assertTrue(r[0] instanceof Throwable); 238 assertTrue(r[1] instanceof Result); 239 table.close(); 240 } 241 242 @Test (timeout=300000) testFlushCommitsNoAbort()243 public void testFlushCommitsNoAbort() throws Exception { 244 LOG.info("test=testFlushCommitsNoAbort"); 245 doTestFlushCommits(false); 246 } 247 248 /** 249 * Only run one Multi test with a forced RegionServer abort. Otherwise, the 250 * unit tests will take an unnecessarily long time to run. 251 * 252 * @throws Exception 253 */ 254 @Test (timeout=360000) testFlushCommitsWithAbort()255 public void testFlushCommitsWithAbort() throws Exception { 256 LOG.info("test=testFlushCommitsWithAbort"); 257 doTestFlushCommits(true); 258 } 259 260 /** 261 * Set table auto flush to false and test flushing commits 262 * @param doAbort true if abort one regionserver in the testing 263 * @throws Exception 264 */ doTestFlushCommits(boolean doAbort)265 private void doTestFlushCommits(boolean doAbort) throws Exception { 266 // Load the data 267 LOG.info("get new table"); 268 Table table = UTIL.getConnection().getTable(TEST_TABLE); 269 table.setWriteBufferSize(10 * 1024 * 1024); 270 271 LOG.info("constructPutRequests"); 272 List<Put> puts = constructPutRequests(); 273 table.put(puts); 274 LOG.info("puts"); 275 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() 276 .size(); 277 assert liveRScount > 0; 278 JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster() 279 .getLiveRegionServerThreads().get(0); 280 if (doAbort) { 281 liveRS.getRegionServer().abort("Aborting for tests", 282 new Exception("doTestFlushCommits")); 283 // If we wait for no regions being online after we abort the server, we 284 // could ensure the master has re-assigned the regions on killed server 285 // after writing successfully. It means the server we aborted is dead 286 // and detected by matser 287 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) { 288 Thread.sleep(100); 289 } 290 // try putting more keys after the abort. same key/qual... just validating 291 // no exceptions thrown 292 puts = constructPutRequests(); 293 table.put(puts); 294 } 295 296 LOG.info("validating loaded data"); 297 validateLoadedData(table); 298 299 // Validate server and region count 300 List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); 301 int count = 0; 302 for (JVMClusterUtil.RegionServerThread t: liveRSs) { 303 count++; 304 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer()); 305 } 306 LOG.info("Count=" + count); 307 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, 308 (doAbort ? (liveRScount - 1) : liveRScount), count); 309 if (doAbort) { 310 UTIL.getMiniHBaseCluster().waitOnRegionServer(0); 311 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() { 312 @Override 313 public boolean evaluate() throws Exception { 314 return UTIL.getMiniHBaseCluster().getMaster() 315 .getClusterStatus().getServersSize() == (liveRScount - 1); 316 } 317 }); 318 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); 319 } 320 321 table.close(); 322 LOG.info("done"); 323 } 324 325 @Test (timeout=300000) testBatchWithPut()326 public void testBatchWithPut() throws Exception { 327 LOG.info("test=testBatchWithPut"); 328 Table table = CONNECTION.getTable(TEST_TABLE); 329 // put multiple rows using a batch 330 List<Put> puts = constructPutRequests(); 331 332 Object[] results = table.batch(puts); 333 validateSizeAndEmpty(results, KEYS.length); 334 335 if (true) { 336 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 337 assert liveRScount > 0; 338 JVMClusterUtil.RegionServerThread liveRS = 339 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 340 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut")); 341 puts = constructPutRequests(); 342 try { 343 results = table.batch(puts); 344 } catch (RetriesExhaustedWithDetailsException ree) { 345 LOG.info(ree.getExhaustiveDescription()); 346 table.close(); 347 throw ree; 348 } 349 validateSizeAndEmpty(results, KEYS.length); 350 } 351 352 validateLoadedData(table); 353 table.close(); 354 } 355 356 @Test(timeout=300000) testBatchWithDelete()357 public void testBatchWithDelete() throws Exception { 358 LOG.info("test=testBatchWithDelete"); 359 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); 360 361 // Load some data 362 List<Put> puts = constructPutRequests(); 363 Object[] results = table.batch(puts); 364 validateSizeAndEmpty(results, KEYS.length); 365 366 // Deletes 367 List<Row> deletes = new ArrayList<Row>(); 368 for (int i = 0; i < KEYS.length; i++) { 369 Delete delete = new Delete(KEYS[i]); 370 delete.addFamily(BYTES_FAMILY); 371 deletes.add(delete); 372 } 373 results = table.batch(deletes); 374 validateSizeAndEmpty(results, KEYS.length); 375 376 // Get to make sure ... 377 for (byte[] k : KEYS) { 378 Get get = new Get(k); 379 get.addColumn(BYTES_FAMILY, QUALIFIER); 380 Assert.assertFalse(table.exists(get)); 381 } 382 table.close(); 383 } 384 385 @Test(timeout=300000) testHTableDeleteWithList()386 public void testHTableDeleteWithList() throws Exception { 387 LOG.info("test=testHTableDeleteWithList"); 388 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); 389 390 // Load some data 391 List<Put> puts = constructPutRequests(); 392 Object[] results = table.batch(puts); 393 validateSizeAndEmpty(results, KEYS.length); 394 395 // Deletes 396 ArrayList<Delete> deletes = new ArrayList<Delete>(); 397 for (int i = 0; i < KEYS.length; i++) { 398 Delete delete = new Delete(KEYS[i]); 399 delete.deleteFamily(BYTES_FAMILY); 400 deletes.add(delete); 401 } 402 table.delete(deletes); 403 Assert.assertTrue(deletes.isEmpty()); 404 405 // Get to make sure ... 406 for (byte[] k : KEYS) { 407 Get get = new Get(k); 408 get.addColumn(BYTES_FAMILY, QUALIFIER); 409 Assert.assertFalse(table.exists(get)); 410 } 411 table.close(); 412 } 413 414 @Test(timeout=300000) testBatchWithManyColsInOneRowGetAndPut()415 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { 416 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); 417 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); 418 419 List<Row> puts = new ArrayList<Row>(); 420 for (int i = 0; i < 100; i++) { 421 Put put = new Put(ONE_ROW); 422 byte[] qual = Bytes.toBytes("column" + i); 423 put.add(BYTES_FAMILY, qual, VALUE); 424 puts.add(put); 425 } 426 Object[] results = table.batch(puts); 427 428 // validate 429 validateSizeAndEmpty(results, 100); 430 431 // get the data back and validate that it is correct 432 List<Row> gets = new ArrayList<Row>(); 433 for (int i = 0; i < 100; i++) { 434 Get get = new Get(ONE_ROW); 435 byte[] qual = Bytes.toBytes("column" + i); 436 get.addColumn(BYTES_FAMILY, qual); 437 gets.add(get); 438 } 439 440 Object[] multiRes = table.batch(gets); 441 442 int idx = 0; 443 for (Object r : multiRes) { 444 byte[] qual = Bytes.toBytes("column" + idx); 445 validateResult(r, qual, VALUE); 446 idx++; 447 } 448 table.close(); 449 } 450 451 @Test(timeout=300000) testBatchWithIncrementAndAppend()452 public void testBatchWithIncrementAndAppend() throws Exception { 453 LOG.info("test=testBatchWithIncrementAndAppend"); 454 final byte[] QUAL1 = Bytes.toBytes("qual1"); 455 final byte[] QUAL2 = Bytes.toBytes("qual2"); 456 final byte[] QUAL3 = Bytes.toBytes("qual3"); 457 final byte[] QUAL4 = Bytes.toBytes("qual4"); 458 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); 459 Delete d = new Delete(ONE_ROW); 460 table.delete(d); 461 Put put = new Put(ONE_ROW); 462 put.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc")); 463 put.add(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L)); 464 table.put(put); 465 466 Increment inc = new Increment(ONE_ROW); 467 inc.addColumn(BYTES_FAMILY, QUAL2, 1); 468 inc.addColumn(BYTES_FAMILY, QUAL3, 1); 469 470 Append a = new Append(ONE_ROW); 471 a.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("def")); 472 a.add(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz")); 473 List<Row> actions = new ArrayList<Row>(); 474 actions.add(inc); 475 actions.add(a); 476 477 Object[] multiRes = table.batch(actions); 478 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef")); 479 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz")); 480 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L)); 481 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L)); 482 table.close(); 483 } 484 485 @Test(timeout=300000) testNonceCollision()486 public void testNonceCollision() throws Exception { 487 LOG.info("test=testNonceCollision"); 488 final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); 489 Table table = connection.getTable(TEST_TABLE); 490 Put put = new Put(ONE_ROW); 491 put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); 492 493 // Replace nonce manager with the one that returns each nonce twice. 494 NonceGenerator cnm = new PerClientRandomNonceGenerator() { 495 long lastNonce = -1; 496 @Override 497 public synchronized long newNonce() { 498 long nonce = 0; 499 if (lastNonce == -1) { 500 lastNonce = nonce = super.newNonce(); 501 } else { 502 nonce = lastNonce; 503 lastNonce = -1L; 504 } 505 return nonce; 506 } 507 }; 508 509 NonceGenerator oldCnm = 510 ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm); 511 512 // First test sequential requests. 513 try { 514 Increment inc = new Increment(ONE_ROW); 515 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 516 table.increment(inc); 517 inc = new Increment(ONE_ROW); 518 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 519 try { 520 table.increment(inc); 521 fail("Should have thrown an exception"); 522 } catch (OperationConflictException ex) { 523 } 524 Get get = new Get(ONE_ROW); 525 get.addColumn(BYTES_FAMILY, QUALIFIER); 526 Result result = table.get(get); 527 validateResult(result, QUALIFIER, Bytes.toBytes(1L)); 528 529 // Now run a bunch of requests in parallel, exactly half should succeed. 530 int numRequests = 40; 531 final CountDownLatch startedLatch = new CountDownLatch(numRequests); 532 final CountDownLatch startLatch = new CountDownLatch(1); 533 final CountDownLatch doneLatch = new CountDownLatch(numRequests); 534 for (int i = 0; i < numRequests; ++i) { 535 Runnable r = new Runnable() { 536 @Override 537 public void run() { 538 Table table = null; 539 try { 540 table = connection.getTable(TEST_TABLE); 541 } catch (IOException e) { 542 fail("Not expected"); 543 } 544 Increment inc = new Increment(ONE_ROW); 545 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); 546 startedLatch.countDown(); 547 try { 548 startLatch.await(); 549 } catch (InterruptedException e) { 550 fail("Not expected"); 551 } 552 try { 553 table.increment(inc); 554 } catch (OperationConflictException ex) { // Some threads are expected to fail. 555 } catch (IOException ioEx) { 556 fail("Not expected"); 557 } 558 doneLatch.countDown(); 559 } 560 }; 561 Threads.setDaemonThreadRunning(new Thread(r)); 562 } 563 startedLatch.await(); // Wait until all threads are ready... 564 startLatch.countDown(); // ...and unleash the herd! 565 doneLatch.await(); 566 // Now verify 567 get = new Get(ONE_ROW); 568 get.addColumn(BYTES_FAMILY, QUALIFIER); 569 result = table.get(get); 570 validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L)); 571 table.close(); 572 } finally { 573 ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm); 574 } 575 } 576 577 @Test(timeout=300000) testBatchWithMixedActions()578 public void testBatchWithMixedActions() throws Exception { 579 LOG.info("test=testBatchWithMixedActions"); 580 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); 581 582 // Load some data to start 583 Object[] results = table.batch(constructPutRequests()); 584 validateSizeAndEmpty(results, KEYS.length); 585 586 // Batch: get, get, put(new col), delete, get, get of put, get of deleted, 587 // put 588 List<Row> actions = new ArrayList<Row>(); 589 590 byte[] qual2 = Bytes.toBytes("qual2"); 591 byte[] val2 = Bytes.toBytes("putvalue2"); 592 593 // 0 get 594 Get get = new Get(KEYS[10]); 595 get.addColumn(BYTES_FAMILY, QUALIFIER); 596 actions.add(get); 597 598 // 1 get 599 get = new Get(KEYS[11]); 600 get.addColumn(BYTES_FAMILY, QUALIFIER); 601 actions.add(get); 602 603 // 2 put of new column 604 Put put = new Put(KEYS[10]); 605 put.add(BYTES_FAMILY, qual2, val2); 606 actions.add(put); 607 608 // 3 delete 609 Delete delete = new Delete(KEYS[20]); 610 delete.deleteFamily(BYTES_FAMILY); 611 actions.add(delete); 612 613 // 4 get 614 get = new Get(KEYS[30]); 615 get.addColumn(BYTES_FAMILY, QUALIFIER); 616 actions.add(get); 617 618 // There used to be a 'get' of a previous put here, but removed 619 // since this API really cannot guarantee order in terms of mixed 620 // get/puts. 621 622 // 5 put of new column 623 put = new Put(KEYS[40]); 624 put.add(BYTES_FAMILY, qual2, val2); 625 actions.add(put); 626 627 results = table.batch(actions); 628 629 // Validation 630 631 validateResult(results[0]); 632 validateResult(results[1]); 633 validateEmpty(results[2]); 634 validateEmpty(results[3]); 635 validateResult(results[4]); 636 validateEmpty(results[5]); 637 638 // validate last put, externally from the batch 639 get = new Get(KEYS[40]); 640 get.addColumn(BYTES_FAMILY, qual2); 641 Result r = table.get(get); 642 validateResult(r, qual2, val2); 643 644 table.close(); 645 } 646 647 // // Helper methods //// 648 validateResult(Object r)649 private void validateResult(Object r) { 650 validateResult(r, QUALIFIER, VALUE); 651 } 652 validateResult(Object r1, byte[] qual, byte[] val)653 private void validateResult(Object r1, byte[] qual, byte[] val) { 654 Result r = (Result)r1; 655 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual)); 656 byte[] value = r.getValue(BYTES_FAMILY, qual); 657 if (0 != Bytes.compareTo(val, value)) { 658 fail("Expected [" + Bytes.toStringBinary(val) 659 + "] but got [" + Bytes.toStringBinary(value) + "]"); 660 } 661 } 662 constructPutRequests()663 private List<Put> constructPutRequests() { 664 List<Put> puts = new ArrayList<>(); 665 for (byte[] k : KEYS) { 666 Put put = new Put(k); 667 put.add(BYTES_FAMILY, QUALIFIER, VALUE); 668 puts.add(put); 669 } 670 return puts; 671 } 672 validateLoadedData(Table table)673 private void validateLoadedData(Table table) throws IOException { 674 // get the data back and validate that it is correct 675 LOG.info("Validating data on " + table); 676 List<Get> gets = new ArrayList<Get>(); 677 for (byte[] k : KEYS) { 678 Get get = new Get(k); 679 get.addColumn(BYTES_FAMILY, QUALIFIER); 680 gets.add(get); 681 } 682 int retryNum = 10; 683 Result[] results = null; 684 do { 685 results = table.get(gets); 686 boolean finished = true; 687 for (Result result : results) { 688 if (result.isEmpty()) { 689 finished = false; 690 break; 691 } 692 } 693 if (finished) { 694 break; 695 } 696 try { 697 Thread.sleep(10); 698 } catch (InterruptedException e) { 699 } 700 retryNum--; 701 } while (retryNum > 0); 702 703 if (retryNum == 0) { 704 fail("Timeout for validate data"); 705 } else { 706 if (results != null) { 707 for (Result r : results) { 708 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); 709 Assert.assertEquals(0, Bytes.compareTo(VALUE, r 710 .getValue(BYTES_FAMILY, QUALIFIER))); 711 } 712 LOG.info("Validating data on " + table + " successfully!"); 713 } 714 } 715 } 716 validateEmpty(Object r1)717 private void validateEmpty(Object r1) { 718 Result result = (Result)r1; 719 Assert.assertTrue(result != null); 720 Assert.assertTrue(result.getRow() == null); 721 Assert.assertEquals(0, result.rawCells().length); 722 } 723 validateSizeAndEmpty(Object[] results, int expectedSize)724 private void validateSizeAndEmpty(Object[] results, int expectedSize) { 725 // Validate got back the same number of Result objects, all empty 726 Assert.assertEquals(expectedSize, results.length); 727 for (Object result : results) { 728 validateEmpty(result); 729 } 730 } 731 } 732