1 /* 2 Copyright (c) 2010, 2018, Oracle and/or its affiliates. All rights reserved. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 package testsuite.clusterj; 26 27 import java.util.ArrayList; 28 import java.util.Collection; 29 import java.util.Comparator; 30 import java.util.List; 31 import java.util.Random; 32 import java.util.Set; 33 import java.util.TreeSet; 34 35 import com.mysql.clusterj.ClusterJException; 36 import com.mysql.clusterj.ClusterJUserException; 37 import com.mysql.clusterj.Query; 38 import com.mysql.clusterj.Session; 39 import com.mysql.clusterj.query.QueryDomainType; 40 41 import testsuite.clusterj.model.Customer; 42 import testsuite.clusterj.model.Order; 43 import testsuite.clusterj.model.OrderLine; 44 45 @org.junit.Ignore("Bug#28550140 : disable test until diagnosis of failure") 46 public class ReconnectTest extends AbstractClusterJModelTest { 47 48 @Override getDebug()49 protected boolean getDebug() { 50 return false; 51 } 52 53 private int numberOfThreads = 30; 54 private int numberOfNewCustomersPerThread = 5; 55 private int numberOfNewOrdersPerNewCustomer = 5; 56 private int numberOfUpdatesPerThread = 2; 57 58 private int maximumOrderLinesPerOrder = 5; 59 private int maximumQuantityPerOrderLine = 100; 60 private int maximumUnitPrice = 100; 61 62 63 private int numberOfInitialCustomers = 10; 64 private int nextCustomerId = numberOfInitialCustomers; 65 private int nextOrderId = 0; 66 private int nextOrderLineId = 0; 67 68 private int numberOfUpdatedOrderLines = 0; 69 private int numberOfDeletedOrders = 0; 70 private int numberOfDeletedOrderLines = 0; 71 72 private ThreadGroup threadGroup; 73 74 // member variables for synchronization between threads 75 private int numberOfThreadsReady = 0; 76 final private Object numberOfThreadsReadySync = new Object(); 77 incrementNumberOfThreadsReady()78 private void incrementNumberOfThreadsReady() { 79 synchronized (numberOfThreadsReadySync) { 80 numberOfThreadsReady++; 81 numberOfThreadsReadySync.notify(); 82 } 83 } 84 waitUntilAtleastNumberOfThreadsReady(int value)85 private void waitUntilAtleastNumberOfThreadsReady(int value) { 86 synchronized (numberOfThreadsReadySync) { 87 while(numberOfThreadsReady < value) { 88 logger.warn("Waiting on " + value); 89 try { 90 numberOfThreadsReadySync.wait(); 91 } catch (InterruptedException ie) { 92 ie.printStackTrace(); 93 } 94 } 95 } 96 } 97 98 /** Customers */ 99 List<Customer> customers = new ArrayList<Customer>(); 100 101 /** Orders */ 102 List<Order> orders = new ArrayList<Order>(); 103 104 /** Order lines */ 105 Set<OrderLine> orderlines = new TreeSet<OrderLine>( 106 new Comparator<OrderLine>() { 107 public int compare(OrderLine o1, OrderLine o2) { 108 return o1.getId() - o2.getId(); 109 } 110 } 111 ); 112 113 @Override localSetUp()114 public void localSetUp() { 115 createSessionFactory(); 116 session = sessionFactory.getSession(); 117 // first delete all customers, orders, and order lines 118 tx = session.currentTransaction(); 119 tx.begin(); 120 session.deletePersistentAll(Customer.class); 121 session.deletePersistentAll(Order.class); 122 session.deletePersistentAll(OrderLine.class); 123 tx.commit(); 124 // start out with some customers 125 createCustomerInstances(nextCustomerId); 126 // add new customer instances 127 tx.begin(); 128 session.makePersistentAll(customers); 129 tx.commit(); 130 // get rid of them when we're done 131 addTearDownClasses(Customer.class); 132 addTearDownClasses(Order.class); 133 addTearDownClasses(OrderLine.class); 134 session.close(); 135 } 136 sleep(long millis)137 private void sleep(long millis) { 138 try { 139 Thread.sleep(millis); 140 } catch (InterruptedException e) { 141 e.printStackTrace(); 142 } 143 } createCustomerInstances(int numberToCreate)144 private void createCustomerInstances(int numberToCreate) { 145 for (int i = 0; i < numberToCreate; ++i) { 146 Customer customer = session.newInstance(Customer.class); 147 customer.setId(i); 148 customer.setName("Customer number " + i + " (initial)"); 149 customer.setMagic(i * 100); 150 customers.add(customer); 151 } 152 } 153 154 /** The test method creates numberOfThreads threads and starts them. 155 * Once the threads are started, the main thread waits until all threads complete. 156 * The main thread then checks that the proper number of instances are 157 * created in the database and verifies that all orders are consistent 158 * with their order lines. Inconsistency might be due to thread interaction 159 * or improper database updates. 160 */ test()161 public void test() { 162 List<Thread> threads = new ArrayList<Thread>(); 163 // create thread group 164 threadGroup = new ThreadGroup("Stuff"); 165 // create uncaught exception handler 166 MyUncaughtExceptionHandler uncaughtExceptionHandler = new MyUncaughtExceptionHandler(); 167 Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); 168 // create and start the thread that misbehaves 169 Thread misbehaving = new Thread(threadGroup, new Misbehaving()); 170 misbehaving.start(); 171 // wait for it to begin 172 waitUntilAtleastNumberOfThreadsReady(1); 173 // create all normal threads 174 for (int i = 0; i < numberOfThreads ; ++i) { 175 Thread thread = new Thread(threadGroup, new StuffToDo()); 176 threads.add(thread); 177 } 178 // start all threads 179 for (Thread thread: threads) { 180 thread.start(); 181 } 182 // wait until atleast one StuffToDo thread is ready 183 waitUntilAtleastNumberOfThreadsReady(2); 184 // tell the SessionFactory to reconnect 185 sessionFactory.reconnect(5); 186 // wait until all threads have finished 187 threads.add(misbehaving); 188 for (Thread t: threads) { 189 try { 190 t.join(); 191 } catch (InterruptedException e) { 192 throw new RuntimeException("Interrupted while joining threads."); 193 } 194 } 195 // if any uncaught exceptions (from threads) signal an error 196 for (Throwable thrown: uncaughtExceptionHandler.getUncaughtExceptions()) { 197 error("Caught exception: " + thrown.getClass().getName() + ": " + thrown.getMessage()); 198 StackTraceElement[] elements = thrown.getStackTrace(); 199 for (StackTraceElement element: elements) { 200 error(" at " + element.toString()); 201 } 202 } 203 // summarize for the record 204 if (retryCount < 5) error ("Retry count too low: " + retryCount); 205 if (getDebug()) { 206 System.out.println("Retry count: " + retryCount); 207 System.out.println("Number of threads: " + numberOfThreads + 208 "; number of new customers per thread: " + numberOfNewCustomersPerThread + 209 "; number of orders per new customer: " + numberOfNewOrdersPerNewCustomer); 210 System.out.println("Created " + nextCustomerId + " customers; " + 211 nextOrderId + " orders; and " + nextOrderLineId + " order lines."); 212 System.out.println("Deleted " + numberOfDeletedOrders + " orders; and " + 213 numberOfDeletedOrderLines + " order lines."); 214 System.out.println("Updated " + numberOfUpdatedOrderLines + " order lines."); 215 } 216 errorIfNotEqual("Failed to create customers.", 217 numberOfThreads * numberOfNewCustomersPerThread + numberOfInitialCustomers, nextCustomerId); 218 errorIfNotEqual("Failed to create orders. ", 219 numberOfThreads * numberOfNewCustomersPerThread * numberOfNewOrdersPerNewCustomer, nextOrderId); 220 // double check the orders to make sure they were updated correctly 221 boolean done = false; 222 while (!done) { 223 if (getDebug()) { System.out.println("verifying..."); } 224 try (Session session = sessionFactory.getSession()) { 225 QueryDomainType<OrderLine> queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class); 226 queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId"))); 227 if (getDebug()) { System.out.println("checking orders: " + orders.size()); } 228 for (Order order: orders) { 229 int orderId = order.getId(); 230 if (getDebug()) System.out.println("Read order " + orderId + " total " + order.getValue()); 231 // replace order with its persistent representation 232 order = session.find(Order.class, orderId); 233 double expectedTotal = order.getValue(); 234 double actualTotal = 0.0d; 235 StringBuffer messages = new StringBuffer(); 236 List<OrderLine> orderLines = new ArrayList<OrderLine>(); 237 for (OrderLine orderLine: getOrderLines(session, queryOrderType, orderId)) { 238 orderLines.add(orderLine); 239 String message = "order " + orderLine.getOrderId() + 240 " orderline " + orderLine.getId() + " value " + orderLine.getTotalValue(); 241 if (getDebug()) System.out.println(message); 242 messages.append(message); 243 messages.append('\n'); 244 actualTotal += orderLine.getTotalValue(); 245 } 246 errorIfNotEqual("For order " + orderId + ", order value does not equal sum of order line values." 247 + " orderLines: \n" + messages.toString(), 248 expectedTotal, actualTotal); 249 } 250 done = true; 251 } catch (Throwable t) { 252 if (getDebug()) { System.out.println("summarize for the record caught " + t.getMessage()); } 253 sleep(1000); 254 } 255 } 256 failOnError(); 257 } 258 259 private int retryCount = 0; incrementRetryCount()260 private void incrementRetryCount() { 261 synchronized(this) { 262 ++retryCount; 263 } 264 } 265 class Misbehaving implements Runnable { 266 @Override run()267 public void run() { 268 Session session = sessionFactory.getSession(); 269 session.currentTransaction().begin(); 270 boolean done = false; 271 QueryDomainType<OrderLine> queryOrderType; 272 // increment status to indicate we are running 273 incrementNumberOfThreadsReady(); 274 while (!done) { 275 try { 276 queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class); 277 queryOrderType.where(queryOrderType.get("orderId").greaterThan( 278 (queryOrderType.param("orderId")))); 279 Query<OrderLine> queryOrder = session.createQuery(queryOrderType); 280 queryOrder.setParameter("orderId", 0); 281 queryOrder.getResultList(); 282 sleep(100); 283 } catch (ClusterJException cje) { 284 // the exception might be any of several exceptions when disconnecting/reconnecting 285 done = true; 286 } 287 } 288 } 289 290 } 291 /** This class implements the logic per thread. For each thread created, 292 * the run method is invoked. 293 * Each thread uses its own session and shares the customer, order, and order line 294 * collections. Collections are synchronized to avoid threading conflicts. 295 * Each thread creates numberOfNewCustomersPerThread customers, each of which 296 * contains a random number of orders, each of which contains a random number 297 * of order lines. 298 * Each thread then updates numberOfUpdatesPerThread orders by changing one 299 * order line. 300 * Each thread then deletes one order and its associated order lines. 301 */ 302 class StuffToDo implements Runnable { 303 304 private Random myRandom = new Random(); 305 run()306 public void run() { 307 // get a session for the queryOrderType 308 QueryDomainType<OrderLine> queryOrderType = null; 309 boolean done = false; 310 while (!done) { 311 try (Session session = sessionFactory.getSession()) { 312 queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class); 313 queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId"))); 314 session.close(); 315 done = true; 316 } catch (ClusterJUserException cjue) { 317 if (getDebug()) { System.out.println("StuffToDo: query orderId caught " + cjue.getMessage()); } 318 if (cjue.getMessage().contains("SessionFactory is not open")) { 319 sleep(300); 320 } 321 } 322 } 323 // tell that this thread is ready 324 incrementNumberOfThreadsReady(); 325 int i = 0; 326 while (i < numberOfNewCustomersPerThread) { 327 // create a new customer 328 try (Session localSession = sessionFactory.getSession()) { 329 Customer newCustomer = null; 330 List<Order> newOrders = new ArrayList<Order>(numberOfNewOrdersPerNewCustomer); 331 localSession.currentTransaction().begin(); 332 newCustomer = createCustomer(localSession, String.valueOf(Thread.currentThread().getId())); 333 int customerId = newCustomer.getId(); 334 for (int j = 0; j < numberOfNewOrdersPerNewCustomer ; ++j) { 335 // create a new order for the customer 336 newOrders.add(createOrder(localSession, customerId, myRandom)); 337 } 338 ++i; 339 localSession.currentTransaction().commit(); 340 // add new customer and orders only if successful 341 addCustomer(newCustomer); 342 addOrders(newOrders); 343 } catch (ClusterJUserException cjue) { 344 if (getDebug()) { System.out.println("StuffToDo: create customer caught " + cjue.getMessage()); } 345 if (cjue.getMessage().contains("SessionFactory is not open")) { 346 incrementRetryCount(); 347 sleep(300); 348 } 349 } 350 } 351 // update orders 352 i = 0; 353 while (i < numberOfUpdatesPerThread) { 354 try (Session localSession = sessionFactory.getSession()) { 355 // update an order 356 localSession.currentTransaction().begin(); 357 Order order = updateOrder(localSession, myRandom, queryOrderType); 358 localSession.currentTransaction().commit(); 359 // put the updated order back 360 addOrder(order); 361 ++i; 362 } catch (ClusterJUserException cjue) { 363 if (getDebug()) { System.out.println("StuffToDo: update orders caught " + cjue.getMessage()); } 364 if (cjue.getMessage().contains("SessionFactory is not open")) { 365 incrementRetryCount(); 366 sleep(300); 367 } 368 } 369 } 370 // delete an order 371 done = false; 372 while (!done) { 373 try (Session localSession = sessionFactory.getSession()) { 374 // delete an order and all of its order lines 375 localSession.currentTransaction().begin(); 376 deleteOrder(localSession, myRandom, queryOrderType); 377 localSession.currentTransaction().commit(); 378 done = true; 379 } catch (ClusterJUserException cjue) { 380 if (getDebug()) { System.out.println("StuffToDo: delete order caught " + cjue.getMessage()); } 381 if (cjue.getMessage().contains("SessionFactory is not open")) { 382 incrementRetryCount(); 383 sleep(300); 384 } 385 } 386 } 387 } 388 } 389 390 /** Create a new customer. 391 * @param session the session 392 * @param threadId the thread id of the creating thread 393 * @return the new customer 394 */ createCustomer(Session session, String threadId)395 private Customer createCustomer(Session session, String threadId) { 396 Customer customer = session.newInstance(Customer.class); 397 int id = getNextCustomerId(); 398 customer.setId(id); 399 customer.setName("Customer number " + id + " thread " + threadId); 400 customer.setMagic(id * 10000); 401 session.makePersistent(customer); 402 return customer; 403 } 404 405 /** Create a new order for a specific customer with a random number of order lines 406 * and a random unit price and quantity. 407 * @param session the session 408 * @param customer the customer 409 * @param random the random number generator 410 * @return the new order 411 */ createOrder(Session session, int customerId, Random random)412 public Order createOrder(Session session, int customerId, Random random) { 413 // get an order number 414 int orderid = getNextOrderId(); 415 Order order = session.newInstance(Order.class); 416 order.setId(orderid); 417 order.setCustomerId(customerId); 418 order.setDescription("Order " + orderid + " for Customer " + customerId); 419 Double orderValue = 0.0d; 420 // now create some order lines 421 int numberOfOrderLines = random.nextInt(maximumOrderLinesPerOrder) + 1; 422 if (getDebug()) System.out.println("Create Order " + orderid 423 + " with numberOfOrderLines: " + numberOfOrderLines); 424 for (int i = 0; i < numberOfOrderLines; ++i) { 425 int orderLineNumber = getNextOrderLineId(); 426 OrderLine orderLine = session.newInstance(OrderLine.class); 427 orderLine.setId(orderLineNumber); 428 orderLine.setOrderId(orderid); 429 long quantity = random.nextInt(maximumQuantityPerOrderLine) + 1; 430 orderLine.setQuantity(quantity); 431 float unitPrice = (1.0f + (float)random.nextInt(maximumUnitPrice)) / 4; 432 orderLine.setUnitPrice(unitPrice); 433 double orderLineValue = unitPrice * quantity; 434 orderValue += orderLineValue; 435 if (getDebug()) System.out.println("Create orderline " + orderLineNumber + " for Order " + orderid 436 + " quantity " + quantity + " price " + unitPrice 437 + " order line value " + orderLineValue + " order value " + orderValue); 438 orderLine.setTotalValue(orderLineValue); 439 addOrderLine(orderLine); 440 session.persist(orderLine); 441 } 442 order.setValue(orderValue); 443 session.persist(order); 444 return order; 445 } 446 447 /** Update an order; change one or more order lines 448 * @param session the session 449 * @param random a random number generator 450 * @param query 451 */ updateOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType)452 public Order updateOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType) { 453 Order order = null; 454 // pick an order to update; prevent anyone else from updating the same order 455 order = removeOrderFromOrdersCollection(random); 456 if (order == null) { return null; } 457 int orderId = order.getId(); 458 // replace order with its persistent representation 459 order = session.find(Order.class, orderId); 460 if (order == null) { return null; } 461 List<OrderLine> orderLines = getOrderLines(session, queryOrderType, orderId); 462 int numberOfOrderLines = orderLines.size(); 463 OrderLine orderLine = null; 464 double orderValue = order.getValue(); 465 if (getDebug()) { System.out.println("updateOrder previous orderValue: " + orderValue); } 466 if (numberOfOrderLines > 0) { 467 int index = random.nextInt(numberOfOrderLines); 468 orderLine = orderLines.get(index); 469 orderValue -= orderLine.getTotalValue(); 470 updateOrderLine(orderLine, random); 471 orderValue += orderLine.getTotalValue(); 472 } 473 if (getDebug()) { System.out.println("updateOrder updated orderValue: " + orderValue); } 474 order.setValue(orderValue); 475 session.updatePersistent(orderLine); 476 session.updatePersistent(order); 477 // return order so it can be put back after committing the transaction 478 return order; 479 } 480 481 /** Update an order line by randomly changing unit price and quantity. 482 * @param orderLine the order line to update 483 * @param random a random number generator 484 */ updateOrderLine(OrderLine orderLine, Random random)485 private void updateOrderLine(OrderLine orderLine, Random random) { 486 int orderid = orderLine.getOrderId(); 487 int orderLineNumber = orderLine.getId(); 488 double previousValue = orderLine.getTotalValue(); 489 long quantity = random.nextInt(maximumQuantityPerOrderLine ); 490 orderLine.setQuantity(quantity); 491 float unitPrice = ((float)random.nextInt(maximumUnitPrice)) / 4; 492 orderLine.setUnitPrice(unitPrice); 493 double orderLineValue = unitPrice * quantity; 494 orderLine.setTotalValue(orderLineValue); 495 if (getDebug()) System.out.println("For order " + orderid + " orderline " + orderLineNumber + 496 " previous order line value " + previousValue + " new order line value " + orderLineValue); 497 synchronized (orderlines) { 498 ++numberOfUpdatedOrderLines; 499 } 500 } 501 502 /** Delete an order from the database. 503 * @param session the session 504 * @param random a random number generator 505 * @param query the query instance to query for OrderLines by OrderId 506 */ deleteOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType)507 public void deleteOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType) { 508 Order order = null; 509 // pick an order to delete 510 order = removeOrderFromOrdersCollection(random); 511 if (order == null) { return; } 512 int orderId = order.getId(); 513 // replace order with its current representation 514 order = session.find(Order.class, orderId); 515 if (order == null) { return; } 516 517 List<OrderLine> orderLines = getOrderLines(session, queryOrderType, orderId); 518 removeOrderLinesFromOrderLinesCollection(orderLines); 519 session.deletePersistentAll(orderLines); 520 session.deletePersistent(order); 521 } 522 getOrderLines(Session session, QueryDomainType<OrderLine> queryOrderType, int orderId)523 private List<OrderLine> getOrderLines(Session session, QueryDomainType<OrderLine> queryOrderType, int orderId) { 524 Query<OrderLine> query = session.createQuery(queryOrderType); 525 query.setParameter("orderId", orderId); 526 return query.getResultList(); 527 } 528 removeOrderFromOrdersCollection(Random random)529 private Order removeOrderFromOrdersCollection(Random random) { 530 synchronized(orders) { 531 int numberOfOrders = orders.size(); 532 if (numberOfOrders < 10) { 533 return null; 534 } 535 int which = random.nextInt(numberOfOrders); 536 ++numberOfDeletedOrders; 537 return orders.remove(which); 538 } 539 } 540 removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove)541 private void removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove) { 542 synchronized(orderlines) { 543 orderlines.removeAll(orderLinesToRemove); 544 numberOfDeletedOrderLines += orderLinesToRemove.size(); 545 } 546 } 547 548 /** Add a new customer to the list of customers (multithread safe) 549 * @param customer the customer to add 550 */ addCustomer(Customer customer)551 private void addCustomer(Customer customer) { 552 synchronized(customers) { 553 customers.add(customer); 554 } 555 } 556 557 /** Get the next customer number (multithread safe) 558 * @return the next customer id 559 */ getNextCustomerId()560 private int getNextCustomerId() { 561 synchronized(customers) { 562 int result = nextCustomerId++; 563 return result; 564 } 565 } 566 567 /** Get the next order number (multithread safe) 568 * @return the next order number 569 */ getNextOrderId()570 private int getNextOrderId() { 571 synchronized(orders) { 572 int result = nextOrderId++; 573 return result; 574 } 575 } 576 577 /** Get the next order line number (multithread safe) 578 * @return the next order line number 579 */ getNextOrderLineId()580 private int getNextOrderLineId() { 581 synchronized(orderlines) { 582 int result = nextOrderLineId++; 583 return result; 584 } 585 } 586 587 /** Add an order to the list of orders. (multithread safe) 588 * @param order the order 589 */ addOrder(Order order)590 private void addOrder(Order order) { 591 synchronized(orders) { 592 orders.add(order); 593 } 594 } 595 596 /** Add a collection of orders to the list of orders. (multithread safe) 597 * @param newOrders the collection of orders 598 */ addOrders(Collection<Order> newOrders)599 private void addOrders(Collection<Order> newOrders) { 600 synchronized(orders) { 601 orders.addAll(newOrders); 602 } 603 } 604 605 /** Add an order line to the list of order lines. (multithread safe) 606 * @param orderLine the order line 607 */ addOrderLine(OrderLine orderLine)608 private void addOrderLine(OrderLine orderLine) { 609 synchronized(orderlines) { 610 orderlines.add(orderLine); 611 } 612 } 613 614 } 615