1 /* 2 Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 package testsuite.clusterj; 26 27 import java.util.ArrayList; 28 import java.util.Collection; 29 import java.util.Comparator; 30 import java.util.List; 31 import java.util.Random; 32 import java.util.Set; 33 import java.util.TreeSet; 34 35 import com.mysql.clusterj.Query; 36 import com.mysql.clusterj.Session; 37 import com.mysql.clusterj.query.QueryDomainType; 38 39 import testsuite.clusterj.model.Customer; 40 import testsuite.clusterj.model.Order; 41 import testsuite.clusterj.model.OrderLine; 42 43 public class MultithreadedTest extends AbstractClusterJModelTest { 44 45 @Override getDebug()46 protected boolean getDebug() { 47 return false; 48 } 49 50 private int numberOfThreads = 50; 51 private int numberOfNewCustomersPerThread = 5; 52 private int numberOfNewOrdersPerNewCustomer = 5; 53 private int numberOfUpdatesPerThread = 2; 54 55 private int maximumOrderLinesPerOrder = 5; 56 private int maximumQuantityPerOrderLine = 100; 57 private int maximumUnitPrice = 100; 58 59 60 private int numberOfInitialCustomers = 10; 61 private int nextCustomerId = numberOfInitialCustomers; 62 private int nextOrderId = 0; 63 private int nextOrderLineId = 0; 64 65 private int numberOfUpdatedOrderLines = 0; 66 private int numberOfDeletedOrders = 0; 67 private int numberOfDeletedOrderLines = 0; 68 69 private ThreadGroup threadGroup; 70 71 /** Customers */ 72 List<Customer> customers = new ArrayList<Customer>(); 73 74 /** Orders */ 75 List<Order> orders = new ArrayList<Order>(); 76 77 /** Order lines */ 78 Set<OrderLine> orderlines = new TreeSet<OrderLine>( 79 new Comparator<OrderLine>() { 80 public int compare(OrderLine o1, OrderLine o2) { 81 return o1.getId() - o2.getId(); 82 } 83 } 84 ); 85 86 @Override localSetUp()87 public void localSetUp() { 88 createSessionFactory(); 89 session = sessionFactory.getSession(); 90 // first delete all customers, orders, and order lines 91 tx = session.currentTransaction(); 92 tx.begin(); 93 session.deletePersistentAll(Customer.class); 94 session.deletePersistentAll(Order.class); 95 session.deletePersistentAll(OrderLine.class); 96 tx.commit(); 97 // start out with some customers 98 createCustomerInstances(nextCustomerId); 99 // add new customer instances 100 tx.begin(); 101 session.makePersistentAll(customers); 102 tx.commit(); 103 // get rid of them when we're done 104 addTearDownClasses(Customer.class); 105 addTearDownClasses(Order.class); 106 addTearDownClasses(OrderLine.class); 107 } 108 createCustomerInstances(int numberToCreate)109 private void createCustomerInstances(int numberToCreate) { 110 for (int i = 0; i < numberToCreate; ++i) { 111 Customer customer = session.newInstance(Customer.class); 112 customer.setId(i); 113 customer.setName("Customer number " + i); 114 customer.setMagic(i * 100); 115 customers.add(customer); 116 } 117 } 118 119 /** The test method creates numberOfThreads threads and starts them. 120 * Once the threads are started, the main thread waits until all threads complete. 121 * The main thread then checks that the proper number of instances are 122 * created in the database and verifies that all orders are consistent 123 * with their order lines. Inconsistency might be due to thread interaction 124 * or improper database updates. 125 */ test()126 public void test() { 127 List<Thread> threads = new ArrayList<Thread>(); 128 // create thread group 129 threadGroup = new ThreadGroup("Stuff"); 130 // create uncaught exception handler 131 MyUncaughtExceptionHandler uncaughtExceptionHandler = new MyUncaughtExceptionHandler(); 132 Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); 133 // create all threads 134 for (int i = 0; i < numberOfThreads ; ++i) { 135 Thread thread = new Thread(threadGroup, new StuffToDo()); 136 threads.add(thread); 137 thread.start(); 138 } 139 // wait until all threads have finished 140 for (Thread t: threads) { 141 try { 142 t.join(); 143 } catch (InterruptedException e) { 144 throw new RuntimeException("Interrupted while joining threads."); 145 } 146 } 147 // if any uncaught exceptions (from threads) signal an error 148 for (Throwable thrown: uncaughtExceptionHandler.getUncaughtExceptions()) { 149 error("Caught exception: " + thrown.getClass().getName() + ": " + thrown.getMessage()); 150 StackTraceElement[] elements = thrown.getStackTrace(); 151 for (StackTraceElement element: elements) { 152 error(" at " + element.toString()); 153 } 154 } 155 // summarize for the record 156 if (getDebug()) { 157 System.out.println("Number of threads: " + numberOfThreads + 158 "; number of new customers per thread: " + numberOfNewCustomersPerThread + 159 "; number of orders per new customer: " + numberOfNewOrdersPerNewCustomer); 160 System.out.println("Created " + nextCustomerId + " customers; " + 161 nextOrderId + " orders; and " + nextOrderLineId + " order lines."); 162 System.out.println("Deleted " + numberOfDeletedOrders + " orders; and " + 163 numberOfDeletedOrderLines + " order lines."); 164 System.out.println("Updated " + numberOfUpdatedOrderLines + " order lines."); 165 } 166 errorIfNotEqual("Failed to create customers.", 167 numberOfThreads * numberOfNewCustomersPerThread + numberOfInitialCustomers, nextCustomerId); 168 errorIfNotEqual("Failed to create orders. ", 169 numberOfThreads * numberOfNewCustomersPerThread * numberOfNewOrdersPerNewCustomer, nextOrderId); 170 // double check the orders to make sure they were updated correctly 171 Session session = sessionFactory.getSession(); 172 QueryDomainType<OrderLine> queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class); 173 queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId"))); 174 Query<OrderLine> query = session.createQuery(queryOrderType); 175 for (Order order: orders) { 176 int orderId = order.getId(); 177 // replace order with its persistent representation 178 order = session.find(Order.class, orderId); 179 double expectedTotal = order.getValue(); 180 double actualTotal = 0.0d; 181 for (OrderLine orderLine: getOrderLines(session, query, orderId)) { 182 actualTotal += orderLine.getTotalValue(); 183 } 184 errorIfNotEqual("For order " + orderId + ", order value does not equal sum of order line values.", 185 expectedTotal, actualTotal); 186 } 187 failOnError(); 188 } 189 190 /** This class implements the logic per thread. For each thread created, 191 * the run method is invoked. 192 * Each thread uses its own session and shares the customer, order, and order line 193 * collections. Collections are synchronized to avoid threading conflicts. 194 * Each thread creates numberOfNewCustomersPerThread customers, each of which 195 * contains a random number of orders, each of which contains a random number 196 * of order lines. 197 * Each thread then updates numberOfUpdatesPerThread orders by changing one 198 * order line. 199 * Each thread then deletes one order and its associated order lines. 200 */ 201 class StuffToDo implements Runnable { 202 203 private Random myRandom = new Random(); 204 run()205 public void run() { 206 // get my own session 207 Session session = sessionFactory.getSession(); 208 QueryDomainType<OrderLine> queryOrderType = session.getQueryBuilder().createQueryDefinition(OrderLine.class); 209 queryOrderType.where(queryOrderType.get("orderId").equal(queryOrderType.param("orderId"))); 210 Query<OrderLine> query = session.createQuery(queryOrderType); 211 for (int i = 0; i < numberOfNewCustomersPerThread; ++i) { 212 // create a new customer 213 createCustomer(session, String.valueOf(Thread.currentThread().getId())); 214 for (int j = 0; j < numberOfNewOrdersPerNewCustomer ; ++j) { 215 // create a new order 216 createOrder(session, myRandom); 217 } 218 } 219 // update orders 220 for (int j = 0; j < numberOfUpdatesPerThread; ++j) { 221 updateOrder(session, myRandom, query); 222 } 223 // delete an order 224 deleteOrder(session, myRandom, query); 225 } 226 227 } 228 229 /** Create a new customer. 230 * 231 * @param session the session 232 * @param threadId the thread id of the creating thread 233 */ createCustomer(Session session, String threadId)234 private void createCustomer(Session session, String threadId) { 235 Customer customer = session.newInstance(Customer.class); 236 int id = getNextCustomerId(); 237 customer.setId(id); 238 customer.setName("Customer number " + id + " thread " + threadId); 239 customer.setMagic(id * 10000); 240 session.makePersistent(customer); // autocommit for this 241 addCustomer(customer); 242 } 243 244 /** Create a new order. Add a new order with a random number of order lines 245 * and a random unit price and quantity. 246 * 247 * @param session the session 248 * @param random a random number generator 249 */ createOrder(Session session, Random random)250 public void createOrder(Session session, Random random) { 251 session.currentTransaction().begin(); 252 // get an order number 253 int orderid = getNextOrderId(); 254 Order order = session.newInstance(Order.class); 255 order.setId(orderid); 256 // get a random customer number 257 int customerId = random .nextInt(nextCustomerId); 258 order.setCustomerId(customerId); 259 order.setDescription("Order " + orderid + " for Customer " + customerId); 260 Double orderValue = 0.0d; 261 // now create some order lines 262 int numberOfOrderLines = random.nextInt(maximumOrderLinesPerOrder); 263 for (int i = 0; i < numberOfOrderLines; ++i) { 264 int orderLineNumber = getNextOrderLineId(); 265 OrderLine orderLine = session.newInstance(OrderLine.class); 266 orderLine.setId(orderLineNumber); 267 orderLine.setOrderId(orderid); 268 long quantity = random.nextInt(maximumQuantityPerOrderLine); 269 orderLine.setQuantity(quantity); 270 float unitPrice = ((float)random.nextInt(maximumUnitPrice)) / 4; 271 orderLine.setUnitPrice(unitPrice); 272 double orderLineValue = unitPrice * quantity; 273 orderValue += orderLineValue; 274 if (getDebug()) System.out.println("For order " + orderid + " orderline " + orderLineNumber + 275 " order line value " + orderLineValue + " order value " + orderValue); 276 orderLine.setTotalValue(orderLineValue); 277 addOrderLine(orderLine); 278 session.persist(orderLine); 279 } 280 order.setValue(orderValue); 281 session.persist(order); 282 session.currentTransaction().commit(); 283 addOrder(order); 284 } 285 286 /** Update an order; change one or more order lines 287 * 288 * @param session the session 289 * @param random a random number generator 290 * @param query 291 */ updateOrder(Session session, Random random, Query<OrderLine> query)292 public void updateOrder(Session session, Random random, Query<OrderLine> query) { 293 session.currentTransaction().begin(); 294 Order order = null; 295 // pick an order to update; prevent anyone else from updating the same order 296 order = removeOrderFromOrdersCollection(random); 297 if (order == null) { 298 return; 299 } 300 int orderId = order.getId(); 301 // replace order with its persistent representation 302 order = session.find(Order.class, orderId); 303 List<OrderLine> orderLines = getOrderLines(session, query, orderId); 304 int numberOfOrderLines = orderLines.size(); 305 OrderLine orderLine = null; 306 double orderValue = order.getValue(); 307 if (numberOfOrderLines > 0) { 308 int index = random.nextInt(numberOfOrderLines); 309 orderLine = orderLines.get(index); 310 orderValue -= orderLine.getTotalValue(); 311 updateOrderLine(orderLine, random); 312 orderValue += orderLine.getTotalValue(); 313 } 314 order.setValue(orderValue); 315 session.updatePersistent(orderLine); 316 session.updatePersistent(order); 317 session.currentTransaction().commit(); 318 // put order back now that we're done updating it 319 addOrder(order); 320 } 321 322 /** Update an order line by randomly changing unit price and quantity. 323 * 324 * @param orderLine the order line to update 325 * @param random a random number generator 326 */ updateOrderLine(OrderLine orderLine, Random random)327 private void updateOrderLine(OrderLine orderLine, Random random) { 328 int orderid = orderLine.getOrderId(); 329 int orderLineNumber = orderLine.getId(); 330 double previousValue = orderLine.getTotalValue(); 331 long quantity = random.nextInt(maximumQuantityPerOrderLine ); 332 orderLine.setQuantity(quantity); 333 float unitPrice = ((float)random.nextInt(maximumUnitPrice)) / 4; 334 orderLine.setUnitPrice(unitPrice); 335 double orderLineValue = unitPrice * quantity; 336 orderLine.setTotalValue(orderLineValue); 337 if (getDebug()) System.out.println("For order " + orderid + " orderline " + orderLineNumber + 338 " previous order line value " + previousValue + " new order line value " + orderLineValue); 339 synchronized (orderlines) { 340 ++numberOfUpdatedOrderLines; 341 } 342 } 343 344 /** Delete an order from the database. 345 * 346 * @param session the session 347 * @param random a random number generator 348 * @param query the query instance to query for OrderLines by OrderId 349 */ deleteOrder(Session session, Random random, Query<OrderLine> query)350 public void deleteOrder(Session session, Random random, Query<OrderLine> query) { 351 session.currentTransaction().begin(); 352 Order order = null; 353 // pick an order to delete 354 order = removeOrderFromOrdersCollection(random); 355 if (order == null) { 356 return; 357 } 358 int orderId = order.getId(); 359 List<OrderLine> orderLines = getOrderLines(session, query, orderId); 360 removeOrderLinesFromOrderLinesCollection(orderLines); 361 session.deletePersistentAll(orderLines); 362 session.deletePersistent(order); 363 session.currentTransaction().commit(); 364 } 365 getOrderLines(Session session, Query<OrderLine> query, int orderId)366 private List<OrderLine> getOrderLines(Session session, Query<OrderLine> query, int orderId) { 367 query.setParameter("orderId", orderId); 368 return query.getResultList(); 369 } 370 removeOrderFromOrdersCollection(Random random)371 private Order removeOrderFromOrdersCollection(Random random) { 372 synchronized(orders) { 373 int numberOfOrders = orders.size(); 374 if (numberOfOrders < 10) { 375 return null; 376 } 377 int orderId = random.nextInt(numberOfOrders); 378 ++numberOfDeletedOrders; 379 return orders.remove(orderId); 380 } 381 } 382 removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove)383 private void removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orderLinesToRemove) { 384 synchronized(orderlines) { 385 orderlines.removeAll(orderLinesToRemove); 386 numberOfDeletedOrderLines += orderLinesToRemove.size(); 387 } 388 } 389 390 /** Add a new customer to the list of customers 391 * @param customer 392 */ addCustomer(Customer customer)393 private void addCustomer(Customer customer) { 394 synchronized(customers) { 395 customers.add(customer); 396 } 397 } 398 399 /** Get the next customer number (multithread safe) 400 * @return 401 */ getNextCustomerId()402 private int getNextCustomerId() { 403 synchronized(customers) { 404 return nextCustomerId++; 405 } 406 } 407 408 /** Get the next order number (multithread safe) 409 * @return 410 */ getNextOrderId()411 private int getNextOrderId() { 412 synchronized(orders) { 413 return nextOrderId++; 414 } 415 } 416 417 /** Get the next order line number (multithread safe) 418 * @return 419 */ getNextOrderLineId()420 private int getNextOrderLineId() { 421 synchronized(orderlines) { 422 return nextOrderLineId++; 423 } 424 } 425 426 /** Add an order to the list of orders. 427 * 428 * @param order the order 429 */ addOrder(Order order)430 private void addOrder(Order order) { 431 synchronized(orders) { 432 orders.add(order); 433 } 434 } 435 436 /** Add an order line to the list of order lines. 437 * 438 * @param orderLine the order line 439 */ addOrderLine(OrderLine orderLine)440 private void addOrderLine(OrderLine orderLine) { 441 synchronized(orderlines) { 442 orderlines.add(orderLine); 443 } 444 } 445 446 } 447