1 /*
2    Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 package testsuite.clusterj;
26 
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Comparator;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.Set;
33 import java.util.TreeSet;
34 
35 import com.mysql.clusterj.Query;
36 import com.mysql.clusterj.Session;
37 import com.mysql.clusterj.query.QueryDomainType;
38 
39 import testsuite.clusterj.model.Customer;
40 import testsuite.clusterj.model.Order;
41 import testsuite.clusterj.model.OrderLine;
42 
43 public class MultithreadedTest extends AbstractClusterJModelTest {
44 
45     @Override
getDebug()46     protected boolean getDebug() {
47         return false;
48     }
49 
50     private int numberOfThreads = 50;
51     private int numberOfNewCustomersPerThread = 5;
52     private int numberOfNewOrdersPerNewCustomer = 5;
53     private int numberOfUpdatesPerThread = 2;
54 
55     private int maximumOrderLinesPerOrder = 5;
56     private int maximumQuantityPerOrderLine = 100;
57     private int maximumUnitPrice = 100;
58 
59 
60     private int numberOfInitialCustomers = 10;
61     private int nextCustomerId = numberOfInitialCustomers;
62     private int nextOrderId = 0;
63     private int nextOrderLineId = 0;
64 
65     private int numberOfUpdatedOrderLines = 0;
66     private int numberOfDeletedOrders = 0;
67     private int numberOfDeletedOrderLines = 0;
68 
69     private ThreadGroup threadGroup;
70 
71     /** Customers */
72     List<Customer> customers = new ArrayList<Customer>();
73 
74     /** Orders */
75     List<Order> orders = new ArrayList<Order>();
76 
77     /** Order lines */
78     Set<OrderLine> orderlines = new TreeSet<OrderLine>(
79             new Comparator<OrderLine>() {
80                 public int compare(OrderLine o1, OrderLine o2) {
81                     return o1.getId() - o2.getId();
82                 }
83             }
84         );
85 
86     @Override
localSetUp()87     public void localSetUp() {
88         createSessionFactory();
89         session = sessionFactory.getSession();
90         // first delete all customers, orders, and order lines
91         tx = session.currentTransaction();
92         tx.begin();
93         session.deletePersistentAll(Customer.class);
94         session.deletePersistentAll(Order.class);
95         session.deletePersistentAll(OrderLine.class);
96         tx.commit();
97         // start out with some customers
98         createCustomerInstances(nextCustomerId);
99         // add new customer instances
100         tx.begin();
101         session.makePersistentAll(customers);
102         tx.commit();
103         // get rid of them when we're done
104         addTearDownClasses(Customer.class);
105         addTearDownClasses(Order.class);
106         addTearDownClasses(OrderLine.class);
107     }
108 
createCustomerInstances(int numberToCreate)109     private void createCustomerInstances(int numberToCreate) {
110         for (int i = 0; i < numberToCreate; ++i) {
111             Customer customer = session.newInstance(Customer.class);
112             customer.setId(i);
113             customer.setName("Customer number " + i);
114             customer.setMagic(i * 100);
115             customers.add(customer);
116         }
117     }
118 
119     /** The test method creates numberOfThreads threads and starts them.
120      * Once the threads are started, the main thread waits until all threads complete.
121      * The main thread then checks that the proper number of instances are
122      * created in the database and verifies that all orders are consistent
123      * with their order lines. Inconsistency might be due to thread interaction
124      * or improper database updates.
125      */
test()126     public void test() {
127         List<Thread> threads = new ArrayList<Thread>();
128         // create thread group
129         threadGroup = new ThreadGroup("Stuff");
130         // create uncaught exception handler
131         MyUncaughtExceptionHandler uncaughtExceptionHandler = new MyUncaughtExceptionHandler();
132         Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
133         // create all threads
134         for (int i = 0; i < numberOfThreads ; ++i) {
135             Thread thread = new Thread(threadGroup, new StuffToDo());
136             threads.add(thread);
137             thread.start();
138         }
139         // wait until all threads have finished
140         for (Thread t: threads) {
141             try {
142                 t.join();
143             } catch (InterruptedException e) {
144                 throw new RuntimeException("Interrupted while joining threads.");
145             }
146         }
147         // if any uncaught exceptions (from threads) signal an error
148         for (Throwable thrown: uncaughtExceptionHandler.getUncaughtExceptions()) {
149             error("Caught exception: " + thrown.getClass().getName() + ": " + thrown.getMessage());
150             StackTraceElement[] elements = thrown.getStackTrace();
151             for (StackTraceElement element: elements) {
152                 error("        at " + element.toString());
153             }
154         }
155         // summarize for the record
156         if (getDebug()) {
157             System.out.println("Number of threads: " + numberOfThreads +
158                 "; number of new customers per thread: " + numberOfNewCustomersPerThread +
159                 "; number of orders per new customer: " + numberOfNewOrdersPerNewCustomer);
160             System.out.println("Created " + nextCustomerId + " customers; " +
161                 nextOrderId + " orders; and " + nextOrderLineId + " order lines.");
162             System.out.println("Deleted " + numberOfDeletedOrders + " orders; and " +
163                 numberOfDeletedOrderLines + " order lines.");
164             System.out.println("Updated " + numberOfUpdatedOrderLines + " order lines.");
165         }
166         errorIfNotEqual("Failed to create customers.",
167                 numberOfThreads * numberOfNewCustomersPerThread + numberOfInitialCustomers, nextCustomerId);
168         errorIfNotEqual("Failed to create orders. ",
169                 numberOfThreads * numberOfNewCustomersPerThread * numberOfNewOrdersPerNewCustomer, nextOrderId);
170         // double check the orders to make sure they were updated correctly
171         Session session = sessionFactory.getSession();
172         QueryDomainType<OrderLine> queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class);
173         queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId")));
174         Query<OrderLine> query = session.createQuery(queryOrderType);
175         for (Order order: orders) {
176             int orderId = order.getId();
177             // replace order with its persistent representation
178             order = session.find(Order.class, orderId);
179             double expectedTotal = order.getValue();
180             double actualTotal = 0.0d;
181             for (OrderLine orderLine: getOrderLines(session, query, orderId)) {
182                 actualTotal += orderLine.getTotalValue();
183             }
184             errorIfNotEqual("For order " + orderId + ", order value does not equal sum of order line values.",
185                     expectedTotal, actualTotal);
186         }
187         failOnError();
188     }
189 
190     /** This class implements the logic per thread. For each thread created,
191      * the run method is invoked.
192      * Each thread uses its own session and shares the customer, order, and order line
193      * collections. Collections are synchronized to avoid threading conflicts.
194      * Each thread creates numberOfNewCustomersPerThread customers, each of which
195      * contains a random number of orders, each of which contains a random number
196      * of order lines.
197      * Each thread then updates numberOfUpdatesPerThread orders by changing one
198      * order line.
199      * Each thread then deletes one order and its associated order lines.
200      */
201     class StuffToDo implements Runnable {
202 
203         private Random myRandom = new Random();
204 
run()205         public void run() {
206             // get my own session
207             Session session = sessionFactory.getSession();
208             QueryDomainType<OrderLine> queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class);
209             queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId")));
210             Query<OrderLine> query = session.createQuery(queryOrderType);
211             for (int i = 0; i < numberOfNewCustomersPerThread; ++i) {
212                 // create a new customer
213                 createCustomer(session, String.valueOf(Thread.currentThread().getId()));
214                 for (int j = 0; j < numberOfNewOrdersPerNewCustomer ; ++j) {
215                     // create a new order
216                     createOrder(session, myRandom);
217                 }
218             }
219             // update orders
220             for (int j = 0; j < numberOfUpdatesPerThread; ++j) {
221                 updateOrder(session, myRandom, query);
222             }
223             // delete an order
224             deleteOrder(session, myRandom, query);
225         }
226 
227     }
228 
229     /** Create a new customer.
230      *
231      * @param session the session
232      * @param threadId the thread id of the creating thread
233      */
createCustomer(Session session, String threadId)234     private void createCustomer(Session session, String threadId) {
235         Customer customer = session.newInstance(Customer.class);
236         int id = getNextCustomerId();
237         customer.setId(id);
238         customer.setName("Customer number " + id + " thread " + threadId);
239         customer.setMagic(id * 10000);
240         session.makePersistent(customer); // autocommit for this
241         addCustomer(customer);
242     }
243 
244     /** Create a new order. Add a new order with a random number of order lines
245      * and a random unit price and quantity.
246      *
247      * @param session the session
248      * @param random a random number generator
249      */
createOrder(Session session, Random random)250     public void createOrder(Session session, Random random) {
251         session.currentTransaction().begin();
252         // get an order number
253         int orderid = getNextOrderId();
254         Order order = session.newInstance(Order.class);
255         order.setId(orderid);
256         // get a random customer number
257         int customerId = random .nextInt(nextCustomerId);
258         order.setCustomerId(customerId);
259         order.setDescription("Order " + orderid + " for Customer " + customerId);
260         Double orderValue = 0.0d;
261         // now create some order lines
262         int numberOfOrderLines = random.nextInt(maximumOrderLinesPerOrder);
263         for (int i = 0; i < numberOfOrderLines; ++i) {
264             int orderLineNumber = getNextOrderLineId();
265             OrderLine orderLine = session.newInstance(OrderLine.class);
266             orderLine.setId(orderLineNumber);
267             orderLine.setOrderId(orderid);
268             long quantity = random.nextInt(maximumQuantityPerOrderLine);
269             orderLine.setQuantity(quantity);
270             float unitPrice = ((float)random.nextInt(maximumUnitPrice)) / 4;
271             orderLine.setUnitPrice(unitPrice);
272             double orderLineValue = unitPrice * quantity;
273             orderValue += orderLineValue;
274             if (getDebug()) System.out.println("For order " + orderid + " orderline " + orderLineNumber +
275                     " order line value " + orderLineValue + " order value " + orderValue);
276             orderLine.setTotalValue(orderLineValue);
277             addOrderLine(orderLine);
278             session.persist(orderLine);
279         }
280         order.setValue(orderValue);
281         session.persist(order);
282         session.currentTransaction().commit();
283         addOrder(order);
284     }
285 
286     /** Update an order; change one or more order lines
287      *
288      * @param session the session
289      * @param random a random number generator
290      * @param query
291      */
updateOrder(Session session, Random random, Query<OrderLine> query)292     public void updateOrder(Session session, Random random, Query<OrderLine> query) {
293         session.currentTransaction().begin();
294         Order order = null;
295         // pick an order to update; prevent anyone else from updating the same order
296         order = removeOrderFromOrdersCollection(random);
297         if (order == null) {
298             return;
299         }
300         int orderId = order.getId();
301         // replace order with its persistent representation
302         order = session.find(Order.class, orderId);
303         List<OrderLine> orderLines = getOrderLines(session, query, orderId);
304         int numberOfOrderLines = orderLines.size();
305         OrderLine orderLine = null;
306         double orderValue = order.getValue();
307         if (numberOfOrderLines > 0) {
308             int index = random.nextInt(numberOfOrderLines);
309             orderLine = orderLines.get(index);
310             orderValue -= orderLine.getTotalValue();
311             updateOrderLine(orderLine, random);
312             orderValue += orderLine.getTotalValue();
313         }
314         order.setValue(orderValue);
315         session.updatePersistent(orderLine);
316         session.updatePersistent(order);
317         session.currentTransaction().commit();
318         // put order back now that we're done updating it
319         addOrder(order);
320     }
321 
322     /** Update an order line by randomly changing unit price and quantity.
323      *
324      * @param orderLine the order line to update
325      * @param random a random number generator
326      */
updateOrderLine(OrderLine orderLine, Random random)327     private void updateOrderLine(OrderLine orderLine, Random random) {
328         int orderid = orderLine.getOrderId();
329         int orderLineNumber = orderLine.getId();
330         double previousValue = orderLine.getTotalValue();
331         long quantity = random.nextInt(maximumQuantityPerOrderLine );
332         orderLine.setQuantity(quantity);
333         float unitPrice = ((float)random.nextInt(maximumUnitPrice)) / 4;
334         orderLine.setUnitPrice(unitPrice);
335         double orderLineValue = unitPrice * quantity;
336         orderLine.setTotalValue(orderLineValue);
337         if (getDebug()) System.out.println("For order " + orderid + " orderline " + orderLineNumber +
338                 " previous order line value "  + previousValue + " new order line value " + orderLineValue);
339         synchronized (orderlines) {
340             ++numberOfUpdatedOrderLines;
341         }
342     }
343 
344     /** Delete an order from the database.
345      *
346      * @param session the session
347      * @param random a random number generator
348      * @param query the query instance to query for OrderLines by OrderId
349      */
deleteOrder(Session session, Random random, Query<OrderLine> query)350     public void deleteOrder(Session session, Random random, Query<OrderLine> query) {
351         session.currentTransaction().begin();
352         Order order = null;
353         // pick an order to delete
354         order = removeOrderFromOrdersCollection(random);
355         if (order == null) {
356             return;
357         }
358         int orderId = order.getId();
359         List<OrderLine> orderLines = getOrderLines(session, query, orderId);
360         removeOrderLinesFromOrderLinesCollection(orderLines);
361         session.deletePersistentAll(orderLines);
362         session.deletePersistent(order);
363         session.currentTransaction().commit();
364     }
365 
getOrderLines(Session session, Query<OrderLine> query, int orderId)366     private List<OrderLine> getOrderLines(Session session, Query<OrderLine> query, int orderId) {
367         query.setParameter("orderId", orderId);
368         return query.getResultList();
369     }
370 
removeOrderFromOrdersCollection(Random random)371     private Order removeOrderFromOrdersCollection(Random random) {
372         synchronized(orders) {
373             int numberOfOrders = orders.size();
374             if (numberOfOrders < 10) {
375                 return null;
376             }
377             int orderId = random.nextInt(numberOfOrders);
378             ++numberOfDeletedOrders;
379             return orders.remove(orderId);
380         }
381     }
382 
removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove)383     private void removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove) {
384         synchronized(orderlines) {
385             orderlines.removeAll(orderLinesToRemove);
386             numberOfDeletedOrderLines += orderLinesToRemove.size();
387         }
388     }
389 
390     /** Add a new customer to the list of customers
391      * @param customer
392      */
addCustomer(Customer customer)393     private void addCustomer(Customer customer) {
394         synchronized(customers) {
395             customers.add(customer);
396         }
397     }
398 
399     /** Get the next customer number (multithread safe)
400      * @return
401      */
getNextCustomerId()402     private int getNextCustomerId() {
403         synchronized(customers) {
404             return nextCustomerId++;
405         }
406     }
407 
408     /** Get the next order number (multithread safe)
409      * @return
410      */
getNextOrderId()411     private int getNextOrderId() {
412         synchronized(orders) {
413             return nextOrderId++;
414         }
415     }
416 
417     /** Get the next order line number (multithread safe)
418      * @return
419      */
getNextOrderLineId()420     private int getNextOrderLineId() {
421         synchronized(orderlines) {
422             return nextOrderLineId++;
423         }
424     }
425 
426     /** Add an order to the list of orders.
427      *
428      * @param order the order
429      */
addOrder(Order order)430     private void addOrder(Order order) {
431         synchronized(orders) {
432             orders.add(order);
433         }
434     }
435 
436     /** Add an order line to the list of order lines.
437      *
438      * @param orderLine the order line
439      */
addOrderLine(OrderLine orderLine)440     private void addOrderLine(OrderLine orderLine) {
441         synchronized(orderlines) {
442             orderlines.add(orderLine);
443         }
444     }
445 
446 }
447