1 /*
2    Copyright (c) 2010, 2018, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 package testsuite.clusterj;
26 
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Comparator;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.Set;
33 import java.util.TreeSet;
34 
35 import com.mysql.clusterj.ClusterJException;
36 import com.mysql.clusterj.ClusterJUserException;
37 import com.mysql.clusterj.Query;
38 import com.mysql.clusterj.Session;
39 import com.mysql.clusterj.query.QueryDomainType;
40 
41 import testsuite.clusterj.model.Customer;
42 import testsuite.clusterj.model.Order;
43 import testsuite.clusterj.model.OrderLine;
44 
45 @org.junit.Ignore("Bug#28550140 : disable test until diagnosis of failure")
46 public class ReconnectTest extends AbstractClusterJModelTest {
47 
48     @Override
getDebug()49     protected boolean getDebug() {
50         return false;
51     }
52 
53     private int numberOfThreads = 30;
54     private int numberOfNewCustomersPerThread = 5;
55     private int numberOfNewOrdersPerNewCustomer = 5;
56     private int numberOfUpdatesPerThread = 2;
57 
58     private int maximumOrderLinesPerOrder = 5;
59     private int maximumQuantityPerOrderLine = 100;
60     private int maximumUnitPrice = 100;
61 
62 
63     private int numberOfInitialCustomers = 10;
64     private int nextCustomerId = numberOfInitialCustomers;
65     private int nextOrderId = 0;
66     private int nextOrderLineId = 0;
67 
68     private int numberOfUpdatedOrderLines = 0;
69     private int numberOfDeletedOrders = 0;
70     private int numberOfDeletedOrderLines = 0;
71 
72     private ThreadGroup threadGroup;
73 
74     // member variables for synchronization between threads
75     private int numberOfThreadsReady = 0;
76     final private Object numberOfThreadsReadySync = new Object();
77 
incrementNumberOfThreadsReady()78     private void incrementNumberOfThreadsReady() {
79         synchronized (numberOfThreadsReadySync) {
80             numberOfThreadsReady++;
81             numberOfThreadsReadySync.notify();
82         }
83     }
84 
waitUntilAtleastNumberOfThreadsReady(int value)85     private void waitUntilAtleastNumberOfThreadsReady(int value) {
86         synchronized (numberOfThreadsReadySync) {
87             while(numberOfThreadsReady < value) {
88                 logger.warn("Waiting on " + value);
89                 try {
90                     numberOfThreadsReadySync.wait();
91                 } catch (InterruptedException ie) {
92                     ie.printStackTrace();
93                 }
94             }
95         }
96     }
97 
98     /** Customers */
99     List<Customer> customers = new ArrayList<Customer>();
100 
101     /** Orders */
102     List<Order> orders = new ArrayList<Order>();
103 
104     /** Order lines */
105     Set<OrderLine> orderlines = new TreeSet<OrderLine>(
106             new Comparator<OrderLine>() {
107                 public int compare(OrderLine o1, OrderLine o2) {
108                     return o1.getId() - o2.getId();
109                 }
110             }
111         );
112 
113     @Override
localSetUp()114     public void localSetUp() {
115         createSessionFactory();
116         session = sessionFactory.getSession();
117         // first delete all customers, orders, and order lines
118         tx = session.currentTransaction();
119         tx.begin();
120         session.deletePersistentAll(Customer.class);
121         session.deletePersistentAll(Order.class);
122         session.deletePersistentAll(OrderLine.class);
123         tx.commit();
124         // start out with some customers
125         createCustomerInstances(nextCustomerId);
126         // add new customer instances
127         tx.begin();
128         session.makePersistentAll(customers);
129         tx.commit();
130         // get rid of them when we're done
131         addTearDownClasses(Customer.class);
132         addTearDownClasses(Order.class);
133         addTearDownClasses(OrderLine.class);
134         session.close();
135     }
136 
sleep(long millis)137     private void sleep(long millis) {
138         try {
139             Thread.sleep(millis);
140         } catch (InterruptedException e) {
141             e.printStackTrace();
142         }
143     }
createCustomerInstances(int numberToCreate)144     private void createCustomerInstances(int numberToCreate) {
145         for (int i = 0; i < numberToCreate; ++i) {
146             Customer customer = session.newInstance(Customer.class);
147             customer.setId(i);
148             customer.setName("Customer number " + i + " (initial)");
149             customer.setMagic(i * 100);
150             customers.add(customer);
151         }
152     }
153 
154     /** The test method creates numberOfThreads threads and starts them.
155      * Once the threads are started, the main thread waits until all threads complete.
156      * The main thread then checks that the proper number of instances are
157      * created in the database and verifies that all orders are consistent
158      * with their order lines. Inconsistency might be due to thread interaction
159      * or improper database updates.
160      */
test()161     public void test() {
162         List<Thread> threads = new ArrayList<Thread>();
163         // create thread group
164         threadGroup = new ThreadGroup("Stuff");
165         // create uncaught exception handler
166         MyUncaughtExceptionHandler uncaughtExceptionHandler = new MyUncaughtExceptionHandler();
167         Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
168         // create and start the thread that misbehaves
169         Thread misbehaving = new Thread(threadGroup, new Misbehaving());
170         misbehaving.start();
171         // wait for it to begin
172         waitUntilAtleastNumberOfThreadsReady(1);
173         // create all normal threads
174         for (int i = 0; i < numberOfThreads ; ++i) {
175             Thread thread = new Thread(threadGroup, new StuffToDo());
176             threads.add(thread);
177         }
178         // start all threads
179         for (Thread thread: threads) {
180             thread.start();
181         }
182         // wait until atleast one StuffToDo thread is ready
183         waitUntilAtleastNumberOfThreadsReady(2);
184         // tell the SessionFactory to reconnect
185         sessionFactory.reconnect(5);
186         // wait until all threads have finished
187         threads.add(misbehaving);
188         for (Thread t: threads) {
189             try {
190                 t.join();
191             } catch (InterruptedException e) {
192                 throw new RuntimeException("Interrupted while joining threads.");
193             }
194         }
195         // if any uncaught exceptions (from threads) signal an error
196         for (Throwable thrown: uncaughtExceptionHandler.getUncaughtExceptions()) {
197             error("Caught exception: " + thrown.getClass().getName() + ": " + thrown.getMessage());
198             StackTraceElement[] elements = thrown.getStackTrace();
199             for (StackTraceElement element: elements) {
200                 error("        at " + element.toString());
201             }
202         }
203         // summarize for the record
204         if (retryCount < 5) error ("Retry count too low: " + retryCount);
205         if (getDebug()) {
206             System.out.println("Retry count: " + retryCount);
207             System.out.println("Number of threads: " + numberOfThreads +
208                 "; number of new customers per thread: " + numberOfNewCustomersPerThread +
209                 "; number of orders per new customer: " + numberOfNewOrdersPerNewCustomer);
210             System.out.println("Created " + nextCustomerId + " customers; " +
211                 nextOrderId + " orders; and " + nextOrderLineId + " order lines.");
212             System.out.println("Deleted " + numberOfDeletedOrders + " orders; and " +
213                 numberOfDeletedOrderLines + " order lines.");
214             System.out.println("Updated " + numberOfUpdatedOrderLines + " order lines.");
215         }
216         errorIfNotEqual("Failed to create customers.",
217                 numberOfThreads * numberOfNewCustomersPerThread + numberOfInitialCustomers, nextCustomerId);
218         errorIfNotEqual("Failed to create orders. ",
219                 numberOfThreads * numberOfNewCustomersPerThread * numberOfNewOrdersPerNewCustomer, nextOrderId);
220         // double check the orders to make sure they were updated correctly
221         boolean done = false;
222         while (!done) {
223             if (getDebug()) { System.out.println("verifying..."); }
224             try (Session session = sessionFactory.getSession()) {
225                 QueryDomainType<OrderLine> queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class);
226                 queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId")));
227                 if (getDebug()) { System.out.println("checking orders: " + orders.size()); }
228                 for (Order order: orders) {
229                     int orderId = order.getId();
230                     if (getDebug()) System.out.println("Read order " + orderId + " total " + order.getValue());
231                     // replace order with its persistent representation
232                     order = session.find(Order.class, orderId);
233                     double expectedTotal = order.getValue();
234                     double actualTotal = 0.0d;
235                     StringBuffer messages = new StringBuffer();
236                     List<OrderLine> orderLines = new ArrayList<OrderLine>();
237                     for (OrderLine orderLine: getOrderLines(session, queryOrderType, orderId)) {
238                         orderLines.add(orderLine);
239                         String message = "order " + orderLine.getOrderId() +
240                                 " orderline " + orderLine.getId() + " value " + orderLine.getTotalValue();
241                         if (getDebug()) System.out.println(message);
242                         messages.append(message);
243                         messages.append('\n');
244                         actualTotal += orderLine.getTotalValue();
245                     }
246                     errorIfNotEqual("For order " + orderId + ", order value does not equal sum of order line values."
247                             + " orderLines: \n" + messages.toString(),
248                             expectedTotal, actualTotal);
249                 }
250                 done = true;
251             } catch (Throwable t) {
252                 if (getDebug()) { System.out.println("summarize for the record caught " + t.getMessage()); }
253                 sleep(1000);
254             }
255         }
256         failOnError();
257     }
258 
259     private int retryCount = 0;
incrementRetryCount()260     private void incrementRetryCount() {
261         synchronized(this) {
262             ++retryCount;
263         }
264     }
265     class Misbehaving implements Runnable {
266         @Override
run()267         public void run() {
268             Session session = sessionFactory.getSession();
269             session.currentTransaction().begin();
270             boolean done = false;
271             QueryDomainType<OrderLine> queryOrderType;
272             // increment status to indicate we are running
273             incrementNumberOfThreadsReady();
274             while (!done) {
275                 try {
276                 queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class);
277                 queryOrderType.where(queryOrderType.get("orderId").greaterThan(
278                         (queryOrderType.param("orderId"))));
279                 Query<OrderLine> queryOrder = session.createQuery(queryOrderType);
280                 queryOrder.setParameter("orderId", 0);
281                 queryOrder.getResultList();
282                 sleep(100);
283                 } catch (ClusterJException cje) {
284                     // the exception might be any of several exceptions when disconnecting/reconnecting
285                     done = true;
286                 }
287             }
288         }
289 
290     }
291     /** This class implements the logic per thread. For each thread created,
292      * the run method is invoked.
293      * Each thread uses its own session and shares the customer, order, and order line
294      * collections. Collections are synchronized to avoid threading conflicts.
295      * Each thread creates numberOfNewCustomersPerThread customers, each of which
296      * contains a random number of orders, each of which contains a random number
297      * of order lines.
298      * Each thread then updates numberOfUpdatesPerThread orders by changing one
299      * order line.
300      * Each thread then deletes one order and its associated order lines.
301      */
302     class StuffToDo implements Runnable {
303 
304         private Random myRandom = new Random();
305 
run()306         public void run() {
307             // get a session for the queryOrderType
308             QueryDomainType<OrderLine> queryOrderType = null;
309             boolean done = false;
310             while (!done) {
311                 try (Session session = sessionFactory.getSession()) {
312                     queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class);
313                     queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId")));
314                     session.close();
315                     done = true;
316                 } catch (ClusterJUserException cjue) {
317                     if (getDebug()) { System.out.println("StuffToDo: query orderId caught " + cjue.getMessage()); }
318                     if (cjue.getMessage().contains("SessionFactory is not open")) {
319                         sleep(300);
320                     }
321                 }
322             }
323             // tell that this thread is ready
324             incrementNumberOfThreadsReady();
325             int i = 0;
326             while (i < numberOfNewCustomersPerThread) {
327                 // create a new customer
328                 try (Session localSession = sessionFactory.getSession()) {
329                     Customer newCustomer = null;
330                     List<Order> newOrders = new ArrayList<Order>(numberOfNewOrdersPerNewCustomer);
331                     localSession.currentTransaction().begin();
332                     newCustomer = createCustomer(localSession, String.valueOf(Thread.currentThread().getId()));
333                     int customerId = newCustomer.getId();
334                     for (int j = 0; j < numberOfNewOrdersPerNewCustomer ; ++j) {
335                             // create a new order for the customer
336                             newOrders.add(createOrder(localSession, customerId, myRandom));
337                     }
338                     ++i;
339                     localSession.currentTransaction().commit();
340                     // add new customer and orders only if successful
341                     addCustomer(newCustomer);
342                     addOrders(newOrders);
343                 } catch (ClusterJUserException cjue) {
344                     if (getDebug()) { System.out.println("StuffToDo: create customer caught " + cjue.getMessage()); }
345                     if (cjue.getMessage().contains("SessionFactory is not open")) {
346                         incrementRetryCount();
347                         sleep(300);
348                     }
349                 }
350             }
351             // update orders
352             i = 0;
353             while (i < numberOfUpdatesPerThread) {
354                 try (Session localSession = sessionFactory.getSession()) {
355                     // update an order
356                     localSession.currentTransaction().begin();
357                     Order order = updateOrder(localSession, myRandom, queryOrderType);
358                     localSession.currentTransaction().commit();
359                     // put the updated order back
360                     addOrder(order);
361                     ++i;
362                 } catch (ClusterJUserException cjue) {
363                     if (getDebug()) { System.out.println("StuffToDo: update orders caught " + cjue.getMessage()); }
364                     if (cjue.getMessage().contains("SessionFactory is not open")) {
365                         incrementRetryCount();
366                         sleep(300);
367                     }
368                 }
369             }
370             // delete an order
371             done = false;
372             while (!done) {
373                 try (Session localSession = sessionFactory.getSession()) {
374                     // delete an order and all of its order lines
375                     localSession.currentTransaction().begin();
376                     deleteOrder(localSession, myRandom, queryOrderType);
377                     localSession.currentTransaction().commit();
378                     done = true;
379                 } catch (ClusterJUserException cjue) {
380                     if (getDebug()) { System.out.println("StuffToDo: delete order caught " + cjue.getMessage()); }
381                     if (cjue.getMessage().contains("SessionFactory is not open")) {
382                         incrementRetryCount();
383                         sleep(300);
384                     }
385                 }
386             }
387         }
388     }
389 
390     /** Create a new customer.
391      * @param session the session
392      * @param threadId the thread id of the creating thread
393      * @return the new customer
394      */
createCustomer(Session session, String threadId)395     private Customer createCustomer(Session session, String threadId) {
396         Customer customer = session.newInstance(Customer.class);
397         int id = getNextCustomerId();
398         customer.setId(id);
399         customer.setName("Customer number " + id + " thread " + threadId);
400         customer.setMagic(id * 10000);
401         session.makePersistent(customer);
402         return customer;
403     }
404 
405     /** Create a new order for a specific customer with a random number of order lines
406      * and a random unit price and quantity.
407      * @param session the session
408      * @param customer the customer
409      * @param random the random number generator
410      * @return the new order
411      */
createOrder(Session session, int customerId, Random random)412     public Order createOrder(Session session, int customerId, Random random) {
413         // get an order number
414         int orderid = getNextOrderId();
415         Order order = session.newInstance(Order.class);
416         order.setId(orderid);
417         order.setCustomerId(customerId);
418         order.setDescription("Order " + orderid + " for Customer " + customerId);
419         Double orderValue = 0.0d;
420         // now create some order lines
421         int numberOfOrderLines = random.nextInt(maximumOrderLinesPerOrder) + 1;
422         if (getDebug()) System.out.println("Create Order " + orderid
423                 + " with numberOfOrderLines: " + numberOfOrderLines);
424         for (int i = 0; i < numberOfOrderLines; ++i) {
425             int orderLineNumber = getNextOrderLineId();
426             OrderLine orderLine = session.newInstance(OrderLine.class);
427             orderLine.setId(orderLineNumber);
428             orderLine.setOrderId(orderid);
429             long quantity = random.nextInt(maximumQuantityPerOrderLine) + 1;
430             orderLine.setQuantity(quantity);
431             float unitPrice = (1.0f + (float)random.nextInt(maximumUnitPrice)) / 4;
432             orderLine.setUnitPrice(unitPrice);
433             double orderLineValue = unitPrice * quantity;
434             orderValue += orderLineValue;
435             if (getDebug()) System.out.println("Create orderline " + orderLineNumber + " for Order " + orderid
436                     + " quantity " + quantity + " price " + unitPrice
437                     + " order line value " + orderLineValue + " order value " + orderValue);
438             orderLine.setTotalValue(orderLineValue);
439             addOrderLine(orderLine);
440             session.persist(orderLine);
441         }
442         order.setValue(orderValue);
443         session.persist(order);
444         return order;
445     }
446 
447     /** Update an order; change one or more order lines
448      * @param session the session
449      * @param random a random number generator
450      * @param query
451      */
updateOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType)452     public Order updateOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType) {
453         Order order = null;
454         // pick an order to update; prevent anyone else from updating the same order
455         order = removeOrderFromOrdersCollection(random);
456         if (order == null) { return null; }
457         int orderId = order.getId();
458         // replace order with its persistent representation
459         order = session.find(Order.class, orderId);
460         if (order == null) { return null; }
461         List<OrderLine> orderLines = getOrderLines(session, queryOrderType, orderId);
462         int numberOfOrderLines = orderLines.size();
463         OrderLine orderLine = null;
464         double orderValue = order.getValue();
465         if (getDebug()) { System.out.println("updateOrder previous orderValue: " + orderValue); }
466         if (numberOfOrderLines > 0) {
467             int index = random.nextInt(numberOfOrderLines);
468             orderLine = orderLines.get(index);
469             orderValue -= orderLine.getTotalValue();
470             updateOrderLine(orderLine, random);
471             orderValue += orderLine.getTotalValue();
472         }
473         if (getDebug()) { System.out.println("updateOrder updated orderValue: " + orderValue); }
474         order.setValue(orderValue);
475         session.updatePersistent(orderLine);
476         session.updatePersistent(order);
477         // return order so it can be put back after committing the transaction
478         return order;
479     }
480 
481     /** Update an order line by randomly changing unit price and quantity.
482      * @param orderLine the order line to update
483      * @param random a random number generator
484      */
updateOrderLine(OrderLine orderLine, Random random)485     private void updateOrderLine(OrderLine orderLine, Random random) {
486         int orderid = orderLine.getOrderId();
487         int orderLineNumber = orderLine.getId();
488         double previousValue = orderLine.getTotalValue();
489         long quantity = random.nextInt(maximumQuantityPerOrderLine );
490         orderLine.setQuantity(quantity);
491         float unitPrice = ((float)random.nextInt(maximumUnitPrice)) / 4;
492         orderLine.setUnitPrice(unitPrice);
493         double orderLineValue = unitPrice * quantity;
494         orderLine.setTotalValue(orderLineValue);
495         if (getDebug()) System.out.println("For order " + orderid + " orderline " + orderLineNumber +
496                 " previous order line value "  + previousValue + " new order line value " + orderLineValue);
497         synchronized (orderlines) {
498             ++numberOfUpdatedOrderLines;
499         }
500     }
501 
502     /** Delete an order from the database.
503      * @param session the session
504      * @param random a random number generator
505      * @param query the query instance to query for OrderLines by OrderId
506      */
deleteOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType)507     public void deleteOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType) {
508         Order order = null;
509         // pick an order to delete
510         order = removeOrderFromOrdersCollection(random);
511         if (order == null) { return; }
512         int orderId = order.getId();
513         // replace order with its current representation
514         order = session.find(Order.class, orderId);
515         if (order == null) { return; }
516 
517         List<OrderLine> orderLines = getOrderLines(session, queryOrderType, orderId);
518         removeOrderLinesFromOrderLinesCollection(orderLines);
519         session.deletePersistentAll(orderLines);
520         session.deletePersistent(order);
521     }
522 
getOrderLines(Session session, QueryDomainType<OrderLine> queryOrderType, int orderId)523     private List<OrderLine> getOrderLines(Session session, QueryDomainType<OrderLine> queryOrderType, int orderId) {
524         Query<OrderLine> query = session.createQuery(queryOrderType);
525         query.setParameter("orderId", orderId);
526         return query.getResultList();
527     }
528 
removeOrderFromOrdersCollection(Random random)529     private Order removeOrderFromOrdersCollection(Random random) {
530         synchronized(orders) {
531             int numberOfOrders = orders.size();
532             if (numberOfOrders < 10) {
533                 return null;
534             }
535             int which = random.nextInt(numberOfOrders);
536             ++numberOfDeletedOrders;
537             return orders.remove(which);
538         }
539     }
540 
removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove)541     private void removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove) {
542         synchronized(orderlines) {
543             orderlines.removeAll(orderLinesToRemove);
544             numberOfDeletedOrderLines += orderLinesToRemove.size();
545         }
546     }
547 
548     /** Add a new customer to the list of customers (multithread safe)
549      * @param customer the customer to add
550      */
addCustomer(Customer customer)551     private void addCustomer(Customer customer) {
552         synchronized(customers) {
553             customers.add(customer);
554         }
555     }
556 
557     /** Get the next customer number (multithread safe)
558      * @return the next customer id
559      */
getNextCustomerId()560     private int getNextCustomerId() {
561         synchronized(customers) {
562             int result = nextCustomerId++;
563             return result;
564         }
565     }
566 
567     /** Get the next order number (multithread safe)
568      * @return the next order number
569      */
getNextOrderId()570     private int getNextOrderId() {
571         synchronized(orders) {
572             int result = nextOrderId++;
573             return result;
574         }
575     }
576 
577     /** Get the next order line number (multithread safe)
578      * @return the next order line number
579      */
getNextOrderLineId()580     private int getNextOrderLineId() {
581         synchronized(orderlines) {
582             int result = nextOrderLineId++;
583             return result;
584         }
585     }
586 
587     /** Add an order to the list of orders. (multithread safe)
588      * @param order the order
589      */
addOrder(Order order)590     private void addOrder(Order order) {
591         synchronized(orders) {
592             orders.add(order);
593         }
594     }
595 
596     /** Add a collection of orders to the list of orders. (multithread safe)
597      * @param newOrders the collection of orders
598      */
addOrders(Collection<Order> newOrders)599     private void addOrders(Collection<Order> newOrders) {
600         synchronized(orders) {
601             orders.addAll(newOrders);
602         }
603     }
604 
605     /** Add an order line to the list of order lines. (multithread safe)
606      * @param orderLine the order line
607      */
addOrderLine(OrderLine orderLine)608     private void addOrderLine(OrderLine orderLine) {
609         synchronized(orderlines) {
610             orderlines.add(orderLine);
611         }
612     }
613 
614 }
615