1 /* Copyright (C) 2005-2011 Fabio Riccardi */ 2 3 package com.lightcrafts.jai.utils; 4 5 /* 6 * $RCSfile: SunTileScheduler.java,v $ 7 * 8 * Copyright (c) 2005 Sun Microsystems, Inc. All rights reserved. 9 * 10 * Use is subject to license terms. 11 * 12 * $Revision: 1.1 $ 13 * $Date: 2005/02/11 04:57:02 $ 14 * $State: Exp $ 15 */ 16 import java.awt.Point; 17 import java.awt.RenderingHints; 18 import java.awt.image.Raster; 19 import java.math.BigInteger; 20 import java.util.ArrayList; 21 import java.util.Arrays; 22 import java.util.Collections; 23 import java.util.HashMap; 24 import java.util.HashSet; 25 import java.util.Hashtable; 26 import java.util.LinkedList; 27 import java.util.List; 28 import java.util.Map; 29 import java.util.Set; 30 31 import com.lightcrafts.mediax.jai.OpImage; 32 import com.lightcrafts.mediax.jai.PlanarImage; 33 import com.lightcrafts.mediax.jai.TileCache; 34 import com.lightcrafts.mediax.jai.TileComputationListener; 35 import com.lightcrafts.mediax.jai.TileRequest; 36 import com.lightcrafts.mediax.jai.TileScheduler; 37 import com.lightcrafts.mediax.jai.util.ImagingException; 38 import com.lightcrafts.mediax.jai.util.ImagingListener; 39 import com.lightcrafts.media.jai.util.ImageUtil; 40 41 /** 42 * A class representing a request for non-prefetch background computation 43 * of tiles. The object stores the image, the indices of all tiles being 44 * requested, and references to all listeners associated with the request. 45 * 46 * <code>TileRequest</code> methods are not commented. 47 */ 48 class Request implements TileRequest { 49 50 private final TileScheduler scheduler; 51 52 final PlanarImage image; 53 final List<Point> indices; 54 final Set<TileComputationListener> listeners; 55 56 final Hashtable<Point, Integer> tileStatus; 57 58 /** 59 * Constructs a <code>Request</code>. 60 * 61 * @param scheduler The scheduler processing this request. 62 * @param image The image for which tiles are being computed. 63 * @param tileIndices The indices of the tiles to be computed. 64 * @param tileListeners The listeners to be notified of tile 65 * computation, cancellation, or failure. 66 * 67 * @exception IllegalArgumentException if <code>scheduler</code>, 68 * <code>image</code>, or <code>tileIndices</code> is 69 * <code>null</code> or if <code>tileIndices</code> is 70 * zero-length. 71 */ Request(TileScheduler scheduler, PlanarImage image, Point[] tileIndices, TileComputationListener[] tileListeners)72 Request(TileScheduler scheduler, 73 PlanarImage image, 74 Point[] tileIndices, 75 TileComputationListener[] tileListeners) { 76 77 // Save a reference to the scheduler. 78 if(scheduler == null) { 79 throw new IllegalArgumentException(); // Internal error - no message. 80 } 81 this.scheduler = scheduler; 82 83 // Save a reference to the image. 84 if(image == null) { 85 throw new IllegalArgumentException(); // Internal error - no message. 86 } 87 this.image = image; 88 89 // Ensure there is at least one tile in the request. 90 if(tileIndices == null || tileIndices.length == 0) { 91 // If this happens it is an internal programming error. 92 throw new IllegalArgumentException(); // Internal error - no message. 93 } 94 95 // Save the tile indices. 96 indices = Arrays.asList(tileIndices); 97 98 // Save references to the listeners, if any. 99 if(tileListeners != null) { 100 int numListeners = tileListeners.length; 101 if(numListeners > 0) { 102 listeners = new HashSet<TileComputationListener>(numListeners); 103 Collections.addAll(listeners, tileListeners); 104 } else { 105 listeners = null; 106 } 107 } else { 108 listeners = null; 109 } 110 111 // Initialize status table. 112 tileStatus = new Hashtable<Point, Integer>(tileIndices.length); 113 } 114 115 // --- TileRequest implementation --- 116 getImage()117 public PlanarImage getImage() { 118 return image; 119 } 120 getTileIndices()121 public Point[] getTileIndices() { 122 return indices.toArray(new Point[indices.size()]); 123 } 124 getTileListeners()125 public TileComputationListener[] getTileListeners() { 126 return listeners.toArray(new TileComputationListener[listeners.size()]); 127 } 128 isStatusAvailable()129 public boolean isStatusAvailable() { 130 return true; 131 } 132 getTileStatus(int tileX, int tileY)133 public int getTileStatus(int tileX, int tileY) { 134 Point p = new Point(tileX, tileY); 135 136 int status; 137 if(tileStatus.containsKey(p)) { 138 status = tileStatus.get(p); 139 } else { 140 status = TileRequest.TILE_STATUS_PENDING; 141 } 142 143 return status; 144 } 145 cancelTiles(Point[] tileIndices)146 public void cancelTiles(Point[] tileIndices) { 147 // Forward the call to the scheduler. 148 scheduler.cancelTiles(this, tileIndices); 149 } 150 } 151 152 /** A job to put in a job queue. */ 153 interface Job { 154 /** Computes the job required. */ compute()155 void compute(); 156 157 /** Returns <code>true</code> if the job is not done. */ notDone()158 boolean notDone(); 159 160 /** Returns the image for which tiles are being computed. */ getOwner()161 PlanarImage getOwner(); 162 163 /** 164 * Returns <code>true</code> if and only if the job should block the 165 * thread which processes it. In this case the scheduler and the 166 * processing thread must communicate using <code>wait()</code> and 167 * <code>notify()</code>. 168 */ isBlocking()169 boolean isBlocking(); 170 171 /** Returns the first exception encountered or <code>null</code>. */ getException()172 Exception getException(); 173 } 174 175 /** 176 * A <code>Job</code> which computes a single tile at a time for a 177 * non-prefetch background job queued by the version of scheduleTiles() 178 * which returns a <code>TileRequest</code>. This <code>Job</code> 179 * notifies all <code>TileComputationListener</code>s of all 180 * <code>TileRequest</code>s with which this tile is associated of 181 * whether the tile was computed or the computation failed. 182 */ 183 final class RequestJob implements Job { 184 185 final LCTileScheduler scheduler; // the TileScheduler 186 187 final PlanarImage owner; // the image this tile belongs to 188 final int tileX; // tile's X index 189 final int tileY; // tile's Y index 190 final Raster[] tiles; // the computed tiles 191 final int offset; // offset into arrays 192 193 boolean done = false; // flag indicating completion status 194 Exception exception = null; // Any exception that might have occured 195 // during computeTile 196 197 /** Constructor. */ RequestJob(LCTileScheduler scheduler, PlanarImage owner, int tileX, int tileY, Raster[] tiles, int offset)198 RequestJob(LCTileScheduler scheduler, 199 PlanarImage owner, int tileX, int tileY, 200 Raster[] tiles, int offset) { 201 this.scheduler = scheduler; 202 this.owner = owner; 203 this.tileX = tileX; 204 this.tileY = tileY; 205 this.tiles = tiles; 206 this.offset = offset; 207 } 208 209 /** 210 * Tile computation. Does the actual call to getTile(). 211 */ compute()212 public void compute() { 213 // Get the Request List. 214 List<Request> reqList; 215 synchronized(scheduler.tileRequests) { 216 // Initialize the tile ID. 217 Object tileID = LCTileScheduler.tileKey(owner, tileX, tileY); 218 219 // Remove the List of Requests from the request Map. 220 reqList = scheduler.tileRequests.remove(tileID); 221 222 // Remove the tile Job from the job Map. 223 scheduler.tileJobs.remove(tileID); 224 } 225 226 // Check whether reqList is valid in case job was cancelled while 227 // blocking on the tileRequests Map above. 228 // XXX Do not need empty check in next line? 229 if(reqList != null && !reqList.isEmpty()) { 230 // Update tile status to "processing". 231 Point p = new Point(tileX, tileY); 232 Integer tileStatus = TileRequest.TILE_STATUS_PROCESSING; 233 for (Request r : reqList) { 234 r.tileStatus.put(p, tileStatus); 235 } 236 237 try { 238 tiles[offset] = owner.getTile(tileX, tileY); 239 } catch (Exception e) { 240 exception = e; 241 } catch (Error e) { 242 exception = new Exception(e); 243 } finally { 244 // Extract the Set of all TileComputationListeners. 245 Set<TileComputationListener> listeners = LCTileScheduler.getListeners(reqList); 246 247 // XXX Do not need empty check in next line. 248 if(listeners != null && !listeners.isEmpty()) { 249 // Get TileRequests as an array for later use. 250 TileRequest[] requests = reqList.toArray(new TileRequest[reqList.size()]); 251 252 // Update tile status as needed. 253 tileStatus = exception == null ? 254 TileRequest.TILE_STATUS_COMPUTED : 255 TileRequest.TILE_STATUS_FAILED; 256 for (TileRequest r : requests) { 257 ((Request)r).tileStatus.put(p, tileStatus); 258 } 259 260 // Notify listeners. 261 if(exception == null) { 262 // Tile computation successful. 263 for (TileComputationListener listener : listeners) { 264 listener.tileComputed(scheduler, requests, 265 owner, tileX, tileY, 266 tiles[offset]); 267 } 268 } else { 269 // Tile computation unsuccessful. 270 for (TileComputationListener listener : listeners) { 271 listener.tileComputationFailure(scheduler, requests, 272 owner, tileX, tileY, 273 exception); 274 } 275 } 276 } 277 } 278 } 279 280 // Set the flag indicating job completion. 281 done = true; 282 } 283 284 /** 285 * Returns <code>true</code> if the job is not done; that is, 286 * the tile is not computed and no exceptions have occurred. 287 */ notDone()288 public boolean notDone() { 289 return !done; 290 } 291 292 /** Returns the image for which the tile is being computed. */ getOwner()293 public PlanarImage getOwner() { 294 return owner; 295 } 296 297 /** Always returns <code>true</code>. */ isBlocking()298 public boolean isBlocking() { 299 // Big Change: this should prevent enqueueing of new tiles while an image is being processed 300 return true; 301 } 302 303 /** Returns any encountered exception or <code>null</code>. */ getException()304 public Exception getException() { 305 return exception; 306 } 307 308 /** Returns a string representation of the class object. */ toString()309 public String toString() { 310 String tString = "null"; 311 if (tiles[offset] != null) { 312 tString = tiles[offset].toString(); 313 } 314 return getClass().getName() + "@" + Integer.toHexString(hashCode()) + 315 ": owner = " + owner.toString() + 316 " tileX = " + Integer.toString(tileX) + 317 " tileY = " + Integer.toString(tileY) + 318 " tile = " + tString; 319 } 320 } 321 322 /** 323 * A <code>Job</code> which computes one or more tiles at a time for either 324 * a prefetch job or a blocking job. 325 */ 326 final class TileJob implements Job { 327 328 final LCTileScheduler scheduler; // the TileScheduler 329 330 final boolean isBlocking; // whether the job is blocking 331 final PlanarImage owner; // the image this tile belongs to 332 final Point[] tileIndices; // the tile indices 333 final Raster[] tiles; // the computed tiles 334 final int offset; // offset into arrays 335 final int numTiles; // number of elements to use in indices array 336 337 boolean done = false; // flag indicating completion status 338 Exception exception = null; // The first exception that might have 339 // occured during computeTile 340 341 /** Constructor. */ TileJob(LCTileScheduler scheduler, boolean isBlocking, PlanarImage owner, Point[] tileIndices, Raster[] tiles, int offset, int numTiles)342 TileJob(LCTileScheduler scheduler, boolean isBlocking, 343 PlanarImage owner, Point[] tileIndices, 344 Raster[] tiles, int offset, int numTiles) { 345 this.scheduler = scheduler; 346 this.isBlocking = isBlocking; 347 this.owner = owner; 348 this.tileIndices = tileIndices; 349 this.tiles = tiles; 350 this.offset = offset; 351 this.numTiles = numTiles; 352 } 353 354 /** 355 * Tile computation. Does the actual calls to getTile(). 356 */ compute()357 public void compute() { 358 exception = scheduler.compute(owner, tileIndices, tiles, 359 offset, numTiles, null); 360 done = true; 361 } 362 363 /** 364 * Returns <code>true</code> if the job is not done; that is, 365 * the tile is not computed and no exceptions have occurred. 366 */ notDone()367 public boolean notDone() { 368 return !done; 369 } 370 371 /** Returns the image for which tiles are being computed. */ getOwner()372 public PlanarImage getOwner() { 373 return owner; 374 } 375 376 /** Returns <code>true</code> if and only if there is a listener. */ isBlocking()377 public boolean isBlocking() { 378 return isBlocking; 379 } 380 381 /** Returns any encountered exception or <code>null</code>. */ getException()382 public Exception getException() { 383 return exception; 384 } 385 } 386 387 /** 388 * Worker thread that takes jobs from the tile computation queue and does 389 * the actual computation. 390 */ 391 class WorkerThread extends Thread { 392 393 /** <code>Object</code> indicating the the thread should exit. */ 394 public static final Object TERMINATE = new Object(); 395 396 /** The scheduler that spawned this thread. */ 397 final LCTileScheduler scheduler; 398 399 /** Whether this is a prefetch thread. */ 400 boolean isPrefetch; 401 402 /** Constructor. */ WorkerThread(ThreadGroup group, LCTileScheduler scheduler, boolean isPrefetch)403 public WorkerThread(ThreadGroup group, 404 LCTileScheduler scheduler, 405 boolean isPrefetch) { 406 super(group, group.getName() + group.activeCount()); 407 this.scheduler = scheduler; 408 this.isPrefetch = isPrefetch; 409 410 setDaemon(true); 411 start(); 412 } 413 414 /** Does the tile computation. */ run()415 public void run() { 416 LinkedList<Object> jobQueue = scheduler.getQueue(isPrefetch); 417 418 while(true) { 419 Object dequeuedObject = null; 420 421 // Check the job queue. 422 if(jobQueue.size() > 0) { 423 // Remove the first job. 424 dequeuedObject = jobQueue.removeFirst(); 425 } else { 426 try { 427 // Wait for a notify() on the queue. 428 jobQueue.wait(); 429 continue; 430 } catch(InterruptedException ie) { 431 // Ignore: should never happen. 432 } 433 } 434 435 if(dequeuedObject == TERMINATE || 436 getThreadGroup() == null || getThreadGroup().isDestroyed()) { 437 // Remove WorkerThread from appropriate ArrayList. 438 LinkedList<Thread> threads; 439 synchronized(threads = scheduler.getWorkers(isPrefetch)) { 440 threads.remove(this); 441 } 442 443 // Exit the thread. 444 return; 445 } 446 447 Job job = (Job)dequeuedObject; 448 449 // Execute tile job. 450 if (job != null) { 451 job.compute(); 452 453 // Notify the scheduler only if the Job is blocking. 454 if(job.isBlocking()) { 455 synchronized(scheduler) { 456 scheduler.notify(); 457 } 458 } 459 } 460 } // infinite loop 461 } 462 } 463 464 /** 465 * This is Sun Microsystems' reference implementation of the 466 * <code>com.lightcrafts.mediax.jai.TileScheduler</code> interface. It provides 467 * a mechanism for scheduling tile calculation. Multi-threading is 468 * used whenever possible. 469 * 470 * @see com.lightcrafts.mediax.jai.TileScheduler 471 */ 472 public final class LCTileScheduler implements TileScheduler { 473 474 /** The default number of worker threads. */ 475 private static final int NUM_THREADS_DEFAULT = 2; 476 477 /** The default number of prefetch threads. */ 478 private static final int NUM_PREFETCH_THREADS_DEFAULT = 1; 479 480 /** The instance counter. It is used to compose the name of the 481 * ThreadGroup. 482 */ 483 private static int numInstances = 0; 484 485 /** The root ThreadGroup, which holds two sub-groups: 486 * the ThreadGroup for the standard jobs, and the ThreadGroup for 487 * the prefetch jobs. 488 */ 489 private ThreadGroup rootGroup; 490 491 /** The ThreadGroup contains all the standard jobs. */ 492 private ThreadGroup standardGroup; 493 494 /** The ThreadGroup contains all the prefetch jobs. */ 495 private ThreadGroup prefetchGroup; 496 497 /** The worker thread parallelism. */ 498 private int parallelism = NUM_THREADS_DEFAULT; 499 500 /** The processing thread parallelism. */ 501 private int prefetchParallelism = NUM_PREFETCH_THREADS_DEFAULT; 502 503 /** The worker thread priority. */ 504 private int priority = Thread.NORM_PRIORITY; 505 506 /** The prefetch thread priority. */ 507 private int prefetchPriority = Thread.MIN_PRIORITY; 508 509 /** A job queue for tiles waiting to be computed by the worker threads. */ 510 private final LinkedList<Object> queue; 511 512 /** A job queue for tiles waiting to be computed by prefetch workers. */ 513 private final LinkedList<Object> prefetchQueue; 514 515 /** 516 * A <code>LinkedList</code> of <code>WorkerThread</code>s that persist 517 * to do the actual tile computation for normal processing. This 518 * variable should never be set to <code>null</code>. 519 */ 520 private LinkedList<Thread> workers = new LinkedList<Thread>(); 521 522 /** 523 * A <code>LinkedList</code> of <code>WorkerThread</code>s that persist 524 * to do the actual tile computation for prefetch processing. This 525 * variable should never be set to <code>null</code>. 526 */ 527 private LinkedList<Thread> prefetchWorkers = new LinkedList<Thread>(); 528 529 /** 530 * The effective number of worker threads; may differ from 531 * <code>workers.size()</code> due to latency. This value should 532 * equal the size of <code>workers</code> less the number of 533 * <code>WorkerThread.TERMINATE</code>s in <code>queue</code>. 534 */ 535 private int numWorkerThreads = 0; 536 537 /** 538 * The effective number of prefetch worker threads; may differ from 539 * <code>prefetchWorkers.size()</code> due to latency. This value should 540 * equal the size of <code>prefetchWorkers</code> less the number of 541 * <code>WorkerThread.TERMINATE</code>s in <code>prefetchQueue</code>. 542 */ 543 private int numPrefetchThreads = 0; 544 545 /** 546 * <code>Map</code> of tiles currently being computed. The key is 547 * created from the image and tile indices by the <code>tileKey()</code> 548 * method. Each key is mapped to an <code>Object[1]</code> which may 549 * contain <code>null</code>, a <code>Raster</code>, or an indefinite 550 * <code>Object</code> which represent, respectively, that the tile is 551 * being computed, the tile itself, and that the tile computation failed. 552 */ 553 private final Map<Object, Object[]> tilesInProgress = new HashMap<Object, Object[]>(); 554 555 /** 556 * <code>Map</code> of tiles to <code>Request</code>s. The key is 557 * created from the image and tile indices by the <code>tileKey()</code> 558 * method. Each key is mapped to a <code>List</code> of 559 * <code>Request</code> for the tile. If there is no mapping for the 560 * tile, then there are no current requests. If a mapping exists, it 561 * should always be non-null and the <code>List</code> value should 562 * have size of at least unity. 563 */ 564 final Map<Object, List<Request>> tileRequests = new HashMap<Object, List<Request>>(); 565 566 /** 567 * <code>Map</code> of tiles to <code>Job</code>s.The key is 568 * created from the image and tile indices by the <code>tileKey()</code> 569 * method. Each key is mapped to a <code>Job</code> for the tile. If 570 * there is no mapping for the tile, then there is no enqueued 571 * <code>RequestJob</code>. 572 */ 573 Map<Object, Job> tileJobs = new HashMap<Object, Job>(); 574 575 /** The name of this instance. */ 576 private String nameOfThisInstance; 577 578 /** 579 * Returns the hash table "key" as a <code>Object</code> for this 580 * tile. For <code>PlanarImage</code> and 581 * <code>SerializableRenderedImage</code>, the key is generated by 582 * the method <code>ImageUtilgenerateID(Object) </code>. For the 583 * other cases, a <code>Long</code> object is returned. 584 * The upper 32 bits for this <code>Long</code> is the tile owner's 585 * hash code, and the lower 32 bits is the tile's index. 586 */ tileKey(PlanarImage owner, int tileX, int tileY)587 static Object tileKey(PlanarImage owner, int tileX, int tileY) { 588 long idx = tileY * (long)owner.getNumXTiles() + tileX; 589 590 BigInteger imageID = (BigInteger)owner.getImageID(); 591 byte[] buf = imageID.toByteArray(); 592 int length = buf.length; 593 byte[] buf1 = new byte[length + 8]; 594 System.arraycopy(buf, 0, buf1, 0, length); 595 for (int i = 7, j = 0; i >= 0; i--, j += 8) 596 buf1[length++] = (byte)(idx >> j); 597 return new BigInteger(buf1); 598 } 599 600 /** 601 * Returns all <code>TileComputationListener</code>s for the supplied 602 * <code>List</code> of <code>Request</code>s. 603 */ getListeners(List<Request> reqList)604 static Set<TileComputationListener> getListeners(List<Request> reqList) { 605 // Extract the Set of all TileComputationListeners. 606 HashSet<TileComputationListener> listeners = null; 607 for (Request req : reqList) { 608 // XXX Do not need empty check in next line. 609 if (req.listeners != null && !req.listeners.isEmpty()) { 610 if (listeners == null) { 611 listeners = new HashSet<TileComputationListener>(); 612 } 613 listeners.addAll(req.listeners); 614 } 615 } 616 617 return listeners; 618 } 619 620 /** 621 * Constructor. 622 * 623 * @param parallelism The number of worker threads to do tile computation. 624 * If this number is less than 1, no multi-threading is used. 625 * @param priority The priority of worker threads. 626 * @param prefetchParallelism The number of threads to do prefetching. 627 * If this number is less than 1, no multi-threading is used. 628 * @param prefetchPriority The priority of prefetch threads. 629 */ LCTileScheduler(int parallelism, int priority, int prefetchParallelism, int prefetchPriority)630 public LCTileScheduler(int parallelism, int priority, 631 int prefetchParallelism, int prefetchPriority) { 632 // Create queues and set parallelism and priority to default values. 633 this(); 634 635 setParallelism(parallelism); 636 setPriority(priority); 637 setPrefetchParallelism(prefetchParallelism); 638 setPrefetchPriority(prefetchPriority); 639 } 640 641 /** 642 * Constructor. Processing and prefetch queues are created and all 643 * parallelism and priority values are set to default values. 644 */ LCTileScheduler()645 public LCTileScheduler() { 646 queue = new LinkedList<Object>(); 647 prefetchQueue = new LinkedList<Object>(); 648 649 // The tile scheduler name. It is used to compose the name of the 650 // ThreadGroup. 651 String name = "LCTileSchedulerName"; 652 nameOfThisInstance = name + numInstances; 653 rootGroup = new ThreadGroup(nameOfThisInstance); 654 rootGroup.setDaemon(true); 655 656 standardGroup = new ThreadGroup(rootGroup, 657 nameOfThisInstance + "Standard"); 658 standardGroup.setDaemon(true); 659 660 prefetchGroup = new ThreadGroup(rootGroup, 661 nameOfThisInstance + "Prefetch"); 662 prefetchGroup.setDaemon(true); 663 664 numInstances++; 665 } 666 667 /** 668 * Tile computation. Does the actual calls to getTile(). 669 */ compute(PlanarImage owner, Point[] tileIndices, Raster[] tiles, int offset, int numTiles, Request request)670 Exception compute(PlanarImage owner, Point[] tileIndices, 671 Raster[] tiles, int offset, int numTiles, 672 Request request) { 673 Exception exception = null; 674 675 int j = offset; 676 if(request == null || request.listeners == null) { 677 for(int i = 0; i < numTiles; i++, j++) { 678 final Point p = tileIndices[j]; 679 680 try { 681 tiles[j] = owner.getTile(p.x, p.y); 682 } catch (Exception e) { 683 exception = e; 684 685 // Abort the remaining tiles in the job. 686 break; 687 } 688 } 689 } else { // listeners present 690 final Request[] reqs = new Request[] {request}; 691 for(int i = 0; i < numTiles; i++, j++) { 692 final Point p = tileIndices[j]; 693 694 // Update tile status to "processing". 695 Integer tileStatus = TileRequest.TILE_STATUS_PROCESSING; 696 request.tileStatus.put(p, tileStatus); 697 698 try { 699 tiles[j] = owner.getTile(p.x, p.y); 700 for (TileComputationListener listener : request.listeners) { 701 // Update tile status to "computed". 702 tileStatus = TileRequest.TILE_STATUS_COMPUTED; 703 request.tileStatus.put(p, tileStatus); 704 705 listener.tileComputed(this, 706 reqs, 707 owner, 708 p.x, p.y, 709 tiles[j]); 710 } 711 } catch (Exception e) { 712 exception = e; 713 714 // Abort the remaining tiles in the job. 715 break; 716 } 717 } 718 } 719 720 // If an exception occurred, notify listeners that all remaining 721 // tiles in the job have failed. 722 if(exception != null && request != null && request.listeners != null) { 723 final int lastOffset = j; 724 final int numFailed = numTiles - (lastOffset - offset); 725 726 // Mark all tiles starting with the one which generated the 727 // Exception as "failed". 728 for(int i = 0, k = lastOffset; i < numFailed; i++) { 729 Integer tileStatus = TileRequest.TILE_STATUS_FAILED; 730 request.tileStatus.put(tileIndices[k++], tileStatus); 731 } 732 733 // Notify listeners. 734 Request[] reqs = new Request[] {request}; 735 for(int i = 0, k = lastOffset; i < numFailed; i++) { 736 Point p = tileIndices[k++]; 737 for (TileComputationListener listener : request.listeners) { 738 listener.tileComputationFailure(this, reqs, 739 owner, p.x, p.y, 740 exception); 741 } 742 } 743 } 744 745 return exception; 746 } 747 748 /** 749 * Schedules a single tile for computation. 750 * 751 * @param owner The image the tiles belong to. 752 * @param tileX The tile's X index. 753 * @param tileY The tile's Y index. 754 * 755 * @exception IllegalArgumentException if <code>owner</code> is 756 * <code>null</code>. 757 * 758 * @return The computed tile 759 */ 760 // 761 // This method blocks on the 'tilesInProgress' Map to avoid simultaneous 762 // computation of the same tile in two or more different threads. The idea 763 // is to release the resources of all but one thread so that the computation 764 // occurs more quickly. The synchronization variable is an Object[] of length 765 // unity. The computed tile is passed from the computing thread to the 766 // waiting threads via the contents of this Object[]. Thus this method does 767 // not depend on the TileCache to transfer the data. 768 // scheduleTile(OpImage owner, int tileX, int tileY)769 public Raster scheduleTile(OpImage owner, 770 int tileX, 771 int tileY) { 772 if (owner == null) { 773 throw new IllegalArgumentException("Null owner"); 774 } 775 776 // Eventual tile to be returned. 777 Raster tile = null; 778 779 // Get the tile's unique ID. 780 final Object tileID = tileKey(owner, tileX, tileY); 781 782 // Set the computation flag and initialize or retrieve the tile cache. 783 boolean computeTile; 784 final Object[] cache; 785 synchronized(tilesInProgress) { 786 if(computeTile = !tilesInProgress.containsKey(tileID)) { 787 // Computing: add tile ID to the map. 788 tilesInProgress.put(tileID, cache = new Object[1]); 789 } else { 790 // Waiting: get tile cache from the Map. 791 cache = tilesInProgress.get(tileID); 792 } 793 } 794 795 if(computeTile) { 796 try { 797 try { 798 // Attempt to compute the tile. 799 tile = owner.computeTile(tileX, tileY); 800 } catch (OutOfMemoryError e) { 801 // Free some space in cache 802 TileCache tileCache = owner.getTileCache(); 803 if(tileCache != null) { 804 tileCache.removeTiles(owner); 805 } 806 try { 807 // Re-attempt to compute the tile. 808 tile = owner.computeTile(tileX, tileY); 809 } catch (OutOfMemoryError e1) { 810 // Empty the cache 811 if(tileCache != null) { 812 tileCache.flush(); 813 } 814 } 815 816 // Re-attempt to compute the tile. 817 tile = owner.computeTile(tileX, tileY); 818 } 819 } catch(Throwable e) { 820 // Re-throw the Error or Exception. 821 if(e instanceof Error) { 822 throw (Error)e; 823 } else { 824 sendExceptionToListener("RuntimeException", e); 825 } 826 } finally { 827 // Always set the cached tile to a non-null value. 828 cache[0] = tile != null ? tile : new Object(); 829 830 // Notify the thread(s). 831 cache.notifyAll(); 832 833 // Remove the tile ID from the Map. 834 tilesInProgress.remove(tileID); 835 } 836 } else { 837 // Check the cache: a null value indicates computation is 838 // still in progress. 839 if(cache[0] == null) { 840 // Wait for the computation to complete. 841 try { 842 cache.wait(); // XXX Should there be a timeout? 843 } catch(Exception e) { 844 // XXX What response here? 845 } 846 } 847 848 // Set the result only if cache contains a Raster. 849 if(cache[0] instanceof Raster) { 850 tile = (Raster)cache[0]; 851 } else { 852 throw new RuntimeException("Not a Raster instance?"); 853 } 854 } 855 856 return tile; 857 } 858 859 /** 860 * General purpose method for job creation and queueing. Note that 861 * the returned value should be ignored if the <code>listener</code> 862 * parameter is non-<code>null</code>. 863 * 864 * @param owner The image for which tile computation jobs will be queued. 865 * @param tileIndices The indices of the tiles to be computed. 866 * @param isPrefetch Whether the operation is a prefetch. 867 * @param listeners A <code>TileComputationListener</code> of the 868 * processing. May be <code>null</code>. 869 * 870 * @return The computed tiles. This value is meaningless if 871 * <code>listener</code> is non-<code>null</code>. 872 */ 873 // The allowable arguments are constained as follows: 874 // A) owner and tileIndices non-null. 875 // B) (isBlocking,isPrefetch) in {(true,false),(false,false),(false,true)} 876 // C) listeners != null <=> (isBlocking,isPrefetch) == (false,false) 877 // The returned value is one of: 878 // Raster[] <=> (isBlocking,isPrefetch) == (true,false) 879 // Integer <=> (isBlocking,isPrefetch) == (false,false) 880 // (Raster[])null <=> (isBlocking,isPrefetch) == (false,true) scheduleJob(PlanarImage owner, Point[] tileIndices, boolean isBlocking, boolean isPrefetch, TileComputationListener[] listeners)881 private Object scheduleJob(PlanarImage owner, 882 Point[] tileIndices, 883 boolean isBlocking, 884 boolean isPrefetch, 885 TileComputationListener[] listeners) { 886 if(owner == null || tileIndices == null) { 887 // null parameters 888 throw new IllegalArgumentException(); // coding error - no message 889 } else if((isBlocking || isPrefetch) && listeners != null) { 890 // listeners for blocking or prefetch job 891 throw new IllegalArgumentException(); // coding error - no message 892 } else if(isBlocking && isPrefetch) { 893 throw new IllegalArgumentException(); // coding error - no message 894 } 895 896 int numTiles = tileIndices.length; 897 Raster[] tiles = new Raster[numTiles]; 898 Object returnValue = tiles; 899 900 final int numThreads; 901 Job[] jobs = null; 902 int numJobs = 0; 903 904 synchronized(getWorkers(isPrefetch)) { 905 numThreads = getNumThreads(isPrefetch); 906 907 if(numThreads > 0) { // worker threads exist 908 if(numTiles <= numThreads || // no more tiles than threads 909 (!isBlocking && !isPrefetch)) { // non-blocking, non-prefetch 910 911 jobs = new Job[numTiles]; 912 913 if(!isBlocking && !isPrefetch) { 914 Request request = 915 new Request(this, owner, tileIndices, listeners); 916 917 // Override return value. 918 returnValue = request; 919 920 // Queue all tiles as single-tile jobs. 921 while(numJobs < numTiles) { 922 Point p = tileIndices[numJobs]; 923 924 Object tileID = tileKey(owner, p.x, p.y); 925 926 synchronized(tileRequests) { 927 List<Request> reqList = tileRequests.get(tileID); 928 if (reqList != null) { 929 // This tile is already queued in a 930 // non-blocking, non-prefetch job. 931 reqList.add(request); 932 numTiles--; 933 } else { 934 // This tile has not yet been queued. 935 reqList = new ArrayList<Request>(); 936 reqList.add(request); 937 tileRequests.put(tileID, reqList); 938 939 jobs[numJobs] = new RequestJob(this, owner, 940 p.x, p.y, 941 tiles, numJobs); 942 tileJobs.put(tileID, jobs[numJobs]); 943 944 addJob(jobs[numJobs++], false); 945 } 946 } 947 } 948 } else { // numTiles <= numThreads 949 while(numJobs < numTiles) { 950 jobs[numJobs] = new TileJob(this, 951 isBlocking, 952 owner, 953 tileIndices, 954 tiles, 955 numJobs, 956 1); 957 addJob(jobs[numJobs++], isPrefetch); 958 } 959 } 960 } else { // more tiles than worker threads 961 // Set the fraction of unqueued tiles to be processed by 962 // each worker thread. 963 float frac = 1.0F/(2.0F*numThreads); 964 965 // Set the minimum number of tiles each thread may process. 966 // If there is only one thread this will equal the total 967 // number of tiles. 968 int minTilesPerThread = numThreads == 1 ? numTiles : 969 Math.min(Math.max(1, (int)(frac*numTiles/2.0F + 0.5F)), 970 numTiles); 971 972 // Allocate the maximum possible number of multi-tile jobs. 973 // This will be larger than the actual number of jobs but 974 // a more precise calcuation is not possible and a dynamic 975 // storage object such as a Collection would not be useful 976 // since as calculated maxNumJobs = 4*numThreads if the 977 // preceeding values of "frac" and "minTilesPerThread" are 978 // 1/(2*numThreads) and frac*numTiles/2, respectively. 979 int maxNumJobs = numThreads == 1 ? 1 : 980 (int)((float)numTiles/(float)minTilesPerThread+0.5F); 981 jobs = new TileJob[maxNumJobs]; 982 983 // Set the number of enqueued tiles and the number left. 984 int numTilesQueued = 0; 985 int numTilesLeft = numTiles - numTilesQueued; 986 987 // Assign a number of tiles to each thread determined by 988 // the number of remaining tiles, the fraction of remaining 989 // tiles to be processed and the minimum chunk size. 990 while(numTilesLeft > 0) { 991 // Set the number of tiles to the pre-calculated 992 // fraction of tiles yet to be computed. 993 int numTilesInThread = (int)(frac*numTilesLeft + 0.5F); 994 995 // Ensure that the number to be processed is at 996 // least the minimum chunk size. 997 if(numTilesInThread < minTilesPerThread) { 998 numTilesInThread = minTilesPerThread; 999 } 1000 1001 // Clamp number of tiles in thread to number unqueued. 1002 if(numTilesInThread > numTilesLeft) { 1003 numTilesInThread = numTilesLeft; 1004 } 1005 1006 // Decrement the count of remaining tiles. Note that 1007 // this value will be non-negative due to the clamping 1008 // above. 1009 numTilesLeft -= numTilesInThread; 1010 1011 // If the number left is smaller than the minimum chunk 1012 // size then process these tiles in the current job. 1013 if(numTilesLeft < minTilesPerThread) { 1014 numTilesInThread += numTilesLeft; 1015 numTilesLeft = 0; 1016 } 1017 1018 // Create a job to process the number of tiles needed. 1019 jobs[numJobs] = new TileJob(this, 1020 isBlocking, 1021 owner, 1022 tileIndices, 1023 tiles, 1024 numTilesQueued, 1025 numTilesInThread); 1026 1027 // Queue the job and increment the job count. 1028 addJob(jobs[numJobs++], isPrefetch); 1029 1030 // Increment the count of tiles queued. 1031 numTilesQueued += numTilesInThread; 1032 } 1033 } // SingleTile vs. MultiTile Jobs 1034 } // numThreads > 0 1035 } // end synchronized block 1036 1037 if(numThreads != 0) { 1038 // If blocking, wait until all tiles have been computed. 1039 // There is no 'else' block for non-blocking as in that 1040 // case we just want to continue. 1041 if(isBlocking) { 1042 for (int i = 0; i < numJobs; i++) { 1043 synchronized(this) { 1044 while (jobs[i].notDone()) { 1045 try { 1046 wait(); 1047 } catch(InterruptedException ie) { 1048 // Ignore: should never happen. 1049 } 1050 } 1051 } 1052 1053 // XXX: should we re-throw the exception or 1054 // should we reschedule this job ?? krishnag 1055 Exception e = jobs[i].getException(); 1056 1057 if (e != null) { 1058 // Throw a RuntimeException with the Exception's 1059 // message concatenated with the stack trace. 1060 String message = "Exception while scheduling tiles: "; 1061 sendExceptionToListener(message, 1062 new ImagingException(message, e)); 1063 } 1064 } 1065 } 1066 } else { // numThreads == 0 1067 Request request = null; 1068 if(!isBlocking && !isPrefetch) { 1069 request = new Request(this, owner, tileIndices, listeners); 1070 returnValue = request; 1071 } 1072 1073 // no workers; sequentially compute tiles in main thread 1074 Exception e = compute(owner, tileIndices, tiles, 0, numTiles, 1075 request); 1076 1077 // Throw a RuntimeException with the Exception's 1078 // message concatenated with the stack trace. 1079 if(e != null) { 1080 String message = "Exception while scheduling tiles: "; 1081 sendExceptionToListener(message, 1082 new ImagingException(message, e)); 1083 } 1084 } 1085 1086 return returnValue; 1087 } 1088 1089 /** 1090 * Schedules multiple tiles of an image for computation. 1091 * 1092 * @param owner The image the tiles belong to. 1093 * @param tileIndices An array of tile X and Y indices. 1094 * 1095 * @return An array of computed tiles. 1096 */ scheduleTiles(OpImage owner, Point tileIndices[])1097 public Raster[] scheduleTiles(OpImage owner, 1098 Point tileIndices[]) { 1099 if (owner == null || tileIndices == null) { 1100 throw new IllegalArgumentException("Null owner or TileIndices"); 1101 } 1102 return (Raster[])scheduleJob(owner, tileIndices, true, false, null); 1103 } 1104 1105 /** 1106 * Schedule a list of tiles for computation. The supplied listeners 1107 * will be notified after each tile has been computed. This 1108 * method ideally should be non-blocking. If the <code>TileScheduler</code> 1109 * implementation uses multithreading, it is at the discretion of the 1110 * implementation which thread invokes the 1111 * <code>TileComputationListener</code> methods. 1112 */ scheduleTiles(PlanarImage target, Point[] tileIndices, TileComputationListener[] tileListeners)1113 public TileRequest scheduleTiles(PlanarImage target, Point[] tileIndices, 1114 TileComputationListener[] tileListeners) { 1115 if (target == null || tileIndices == null) { 1116 throw new IllegalArgumentException("Null owner or TileIndices"); 1117 } 1118 return (TileRequest)scheduleJob(target, tileIndices, false, false, 1119 tileListeners); 1120 } 1121 1122 /** 1123 * Issues an advisory cancellation request to the 1124 * <code>TileScheduler</code> stating that the indicated tiles of the 1125 * specified image should not be processed. The handling of this request 1126 * is at the discretion of the scheduler which may cancel tile processing 1127 * in progress and remove tiles from its internal queue, remove tiles from 1128 * the queue but not terminate current processing, or simply do nothing. 1129 * 1130 * <p> In the Sun Microsystems reference implementation of 1131 * <code>TileScheduler</code> the second tile cancellation option is 1132 * implemented, i.e., tiles are removed from the internal queue but 1133 * computation already in progress is not terminated. If there is at 1134 * least one worker thread this method should be non-blocking. Any tiles 1135 * allowed to complete computation subsequent to this call are complete 1136 * and will be treated as if they had not been cancelled, e.g., with 1137 * respect to caching, notification of registered listeners, etc. 1138 * Furthermore, cancelling a tile request in no way invalidates the tile 1139 * as a candidate for future recomputation. 1140 */ cancelTiles(TileRequest request, Point[] tileIndices)1141 public void cancelTiles(TileRequest request, Point[] tileIndices) { 1142 if(request == null) { 1143 throw new IllegalArgumentException("Null TileRequest"); 1144 } 1145 1146 Request req = (Request)request; 1147 synchronized(tileRequests) { 1148 // Save the list of all tile indices in this request. 1149 List<Point> reqIndexList = req.indices; 1150 1151 // Initialize the set of tile indices to cancel. 1152 Point[] indices; 1153 if(tileIndices != null && tileIndices.length > 0) { 1154 // Create a Set from the supplied indices. 1155 List<Point> tileIndexList = Arrays.asList(tileIndices); 1156 1157 // Retain only indices which were actually in the request. 1158 tileIndexList.retainAll(reqIndexList); 1159 1160 indices = tileIndexList.toArray(new Point[tileIndexList.size()]); 1161 } else { 1162 indices = reqIndexList.toArray(new Point[reqIndexList.size()]); 1163 } 1164 1165 // Cache status value. 1166 Integer tileStatus = TileRequest.TILE_STATUS_CANCELLED; 1167 1168 // Loop over tile indices to be cancelled. 1169 for (Point p : indices) { 1170 // Get the tile's ID. 1171 Object tileID = tileKey(req.image, p.x, p.y); 1172 1173 // Get the list of requests for this tile. 1174 List<Request> reqList = tileRequests.get(tileID); 1175 1176 // If there are none, proceed to next index. 1177 if(reqList == null) { 1178 continue; 1179 } 1180 1181 // Remove this Request from the Request List for this tile. 1182 reqList.remove(req); 1183 1184 // If the request list is now empty, dequeue the job and 1185 // remove the tile from the hashes. 1186 if(reqList.isEmpty()) { 1187 synchronized(queue) { 1188 Object job = tileJobs.remove(tileID); 1189 if(job != null) { 1190 queue.remove(job); 1191 } 1192 } 1193 tileRequests.remove(tileID); 1194 } 1195 1196 // Update tile status to "cancelled". 1197 req.tileStatus.put(p, tileStatus); 1198 1199 // Notify any listeners. 1200 if(req.listeners != null) { 1201 TileRequest[] reqArray = new TileRequest[]{req}; 1202 for (TileComputationListener listener : req.listeners) { 1203 listener.tileCancelled(this, reqArray, 1204 req.image, p.x, p.y); 1205 } 1206 } 1207 } 1208 } 1209 } 1210 1211 /** 1212 * Prefetchs a list of tiles of an image. 1213 * 1214 * @param owner The image the tiles belong to. 1215 * @param tileIndices An array of tile X and Y indices. 1216 */ prefetchTiles(PlanarImage owner, Point[] tileIndices)1217 public void prefetchTiles(PlanarImage owner, 1218 Point[] tileIndices) { 1219 if(owner == null || tileIndices == null) { 1220 throw new IllegalArgumentException("Null owner or TileIndices"); 1221 } 1222 scheduleJob(owner, tileIndices, false, true, null); 1223 } 1224 1225 /** 1226 * Suggests to the scheduler the degree of parallelism to use in 1227 * processing invocations of <code>scheduleTiles()</code>. For 1228 * example, this might set the number of threads to spawn. It is 1229 * legal to implement this method as a no-op. 1230 * 1231 * <p> In the Sun Microsystems reference implementation of TileScheduler 1232 * this method sets the number of worker threads actually used for tile 1233 * computation. Ideally this number should equal the number of processors 1234 * actually available on the system. It is the responsibility of the 1235 * application to set this value as the number of processors is not 1236 * available via the virtual machine. A parallelism value of zero 1237 * indicates that all tile computation will be effected in the primary 1238 * thread. A parallelism value of <i>N</i> indicates that there will be 1239 * <i>N</i> worker threads in addition to the primary scheduler thread. 1240 * In JAI the parallelism defaults to a value of 2 unless explicity set 1241 * by the application. 1242 * 1243 * @param parallelism The suggested degree of parallelism. 1244 * @throws IllegalArgumentException if <code>parallelism</code> 1245 * is negative. 1246 */ setParallelism(int parallelism)1247 public void setParallelism(int parallelism) { 1248 if (parallelism < 0) { 1249 throw new IllegalArgumentException("Negative Parallelism?"); 1250 } 1251 this.parallelism = parallelism; 1252 } 1253 1254 /** 1255 * Returns the degree of parallelism of the scheduler. 1256 */ getParallelism()1257 public int getParallelism() { 1258 return parallelism; 1259 } 1260 1261 /** 1262 * Identical to <code>setParallelism()</code> but applies only to 1263 * <code>prefetchTiles()</code>. 1264 */ setPrefetchParallelism(int parallelism)1265 public void setPrefetchParallelism(int parallelism) { 1266 if (parallelism < 0) { 1267 throw new IllegalArgumentException("Negative Parallelism?"); 1268 } 1269 prefetchParallelism = parallelism; 1270 } 1271 1272 /** 1273 * Identical to <code>getParallelism()</code> but applies only to 1274 * <code>prefetchTiles()</code>. 1275 */ getPrefetchParallelism()1276 public int getPrefetchParallelism() { 1277 return prefetchParallelism; 1278 } 1279 1280 /** 1281 * Suggests to the scheduler the priority to assign to processing 1282 * effected by <code>scheduleTiles()</code>. For example, this might 1283 * set thread priority. Values outside of the accepted priority range 1284 * will be clamped to the nearest extremum. An implementation may clamp 1285 * the prefetch priority to less than the scheduling priority. It is 1286 * legal to implement this method as a no-op. 1287 * 1288 * <p> In the Sun Microsystems reference implementation of TileScheduler 1289 * this method sets the priority of the worker threads used for tile 1290 * computation. Its initial value is <code>Thread.NORM_PRIORITY</code>. 1291 * 1292 * @param priority The suggested priority. 1293 */ setPriority(int priority)1294 public void setPriority(int priority) { 1295 this.priority = Math.max(Math.min(priority, Thread.MAX_PRIORITY), 1296 Thread.MIN_PRIORITY); 1297 } 1298 1299 /** 1300 * Returns the priority of <code>scheduleTiles()</code> processing. 1301 */ getPriority()1302 public int getPriority() { 1303 return priority; 1304 } 1305 1306 /** 1307 * Identical to <code>setPriority()</code> but applies only to 1308 * <code>prefetchTiles()</code>. 1309 * 1310 * <p> In the Sun Microsystems reference implementation of 1311 * <code>TileScheduler</code>, this method sets the priority of any threads 1312 * spawned to prefetch tiles. Its initial value is 1313 * <code>Thread.MIN_PRIORITY</code>. 1314 */ setPrefetchPriority(int priority)1315 public void setPrefetchPriority(int priority) { 1316 prefetchPriority = Math.max(Math.min(priority, Thread.MAX_PRIORITY), 1317 Thread.MIN_PRIORITY); 1318 } 1319 1320 /** 1321 * Identical to <code>getPriority()</code> but applies only to 1322 * <code>prefetchTiles()</code>. 1323 */ getPrefetchPriority()1324 public int getPrefetchPriority() { 1325 return prefetchPriority; 1326 } 1327 1328 /** Recreate the <code>ThreadGroup</code>is and <code>WorkThread</code>s. 1329 * This happens in the case of applet: the java plugin will exist after 1330 * the termination of the applet so that JAI and LCTileScheduler will 1331 * also exist. However, the <code>ThreadGroup</code>s are destroyed. 1332 * Thus, the old workers should be terminated and new i 1333 * <code>ThreadGroup</code> and workers should be created. 1334 */ 1335 // private synchronized void createThreadGroup(boolean isPrefetch) { createThreadGroup(boolean isPrefetch)1336 private void createThreadGroup(boolean isPrefetch) { 1337 if (rootGroup == null || rootGroup.isDestroyed()) { 1338 rootGroup = new ThreadGroup(nameOfThisInstance); 1339 rootGroup.setDaemon(true); 1340 } 1341 1342 if (isPrefetch && 1343 (prefetchGroup == null || prefetchGroup.isDestroyed())) { 1344 prefetchGroup = new ThreadGroup(rootGroup, 1345 nameOfThisInstance + "Prefetch"); 1346 prefetchGroup.setDaemon(true); 1347 } 1348 1349 if (!isPrefetch && 1350 (standardGroup == null || standardGroup.isDestroyed())) { 1351 standardGroup = new ThreadGroup(rootGroup, 1352 nameOfThisInstance + "Standard"); 1353 standardGroup.setDaemon(true); 1354 } 1355 1356 LinkedList<Thread> thr = getWorkers(isPrefetch); 1357 1358 for(Thread t : thr) { 1359 if (!t.isAlive()) 1360 thr.remove(t); 1361 } 1362 1363 if (isPrefetch) 1364 numPrefetchThreads = thr.size(); 1365 else 1366 numWorkerThreads = thr.size(); 1367 1368 } 1369 1370 /** 1371 * Returns the effective number of threads of the specified type. 1372 * This method also updates the number and priority of threads of 1373 * the specified type according to the global settings. This method 1374 * may add <code>WorkerThread.TERMINATE</code>s to the appropriate 1375 * queue if there are too many effective threads. 1376 */ getNumThreads(boolean isPrefetch)1377 private int getNumThreads(boolean isPrefetch) { 1378 createThreadGroup(isPrefetch); 1379 1380 // Local variables. 1381 LinkedList<Thread> thr = getWorkers(isPrefetch); 1382 int nthr; 1383 final int prll; 1384 final int prty; 1385 1386 // Set local variables depending on the thread type. 1387 if(isPrefetch) { 1388 nthr = numPrefetchThreads; 1389 prll = prefetchParallelism; 1390 prty = prefetchPriority; 1391 } else { 1392 nthr = numWorkerThreads; 1393 prll = parallelism; 1394 prty = priority; 1395 } 1396 1397 // Update priority if it has changed. 1398 if(nthr > 0 && 1399 (thr.get(0)).getPriority() != prty) { 1400 for (Thread t : thr) { 1401 if (t != null && t.getThreadGroup() != null) { 1402 t.setPriority(prty); 1403 } 1404 } 1405 } 1406 1407 if(nthr < prll) { 1408 // Not enough processing threads. 1409 // Add more threads at current priority. 1410 while(nthr < prll) { 1411 Thread t = 1412 new WorkerThread(isPrefetch ? prefetchGroup : standardGroup, 1413 this, isPrefetch); 1414 1415 t.setPriority(prty); 1416 thr.add(t); 1417 nthr++; 1418 } 1419 } else { 1420 // Too many processing threads: queue WorkerThread.TERMINATEs. 1421 // WorkerThread will remove itself later from the appropriate 1422 // ArrayList. 1423 while(nthr > prll) { 1424 addJob(WorkerThread.TERMINATE, isPrefetch); 1425 nthr--; 1426 } 1427 } 1428 1429 // Update the number of effective threads. 1430 if(isPrefetch) { 1431 numPrefetchThreads = nthr; 1432 } else { 1433 numWorkerThreads = nthr; 1434 } 1435 1436 return nthr; 1437 } 1438 1439 /** Returns the appropriate worker list. */ getWorkers(boolean isPrefetch)1440 LinkedList<Thread> getWorkers(boolean isPrefetch) { 1441 return isPrefetch ? workers : prefetchWorkers; 1442 } 1443 1444 /** Returns the appropriate queue. */ getQueue(boolean isPrefetch)1445 LinkedList<Object> getQueue(boolean isPrefetch) { 1446 return isPrefetch ? prefetchQueue : queue; 1447 } 1448 1449 /** Append a job to the appropriate queue. */ addJob(Object job, boolean isPrefetch)1450 private void addJob(Object job, boolean isPrefetch) { 1451 if(job == null || 1452 (job != WorkerThread.TERMINATE && !(job instanceof Job))) { 1453 // Programming error: deliberately no message. 1454 throw new IllegalArgumentException(); 1455 } 1456 1457 final LinkedList<Object> jobQueue = getQueue(isPrefetch); 1458 if(isPrefetch || 1459 jobQueue.isEmpty() || 1460 job instanceof RequestJob) { 1461 // Append job to queue. 1462 jobQueue.addLast(job); 1463 } else { 1464 // If the queue is non-empty or the job is a TileJob 1465 // insert the job after the last TileJob in the queue. 1466 boolean inserted = false; 1467 for(int idx = jobQueue.size() - 1; idx >= 0; idx--) { 1468 if(jobQueue.get(idx) instanceof TileJob) { 1469 jobQueue.add(idx+1, job); 1470 inserted = true; 1471 break; 1472 } 1473 } 1474 if(!inserted) { 1475 jobQueue.addFirst(job); 1476 } 1477 } 1478 jobQueue.notify(); 1479 } 1480 1481 /** Queue WorkerThread.TERMINATEs to all workers. */ finalize()1482 protected void finalize() throws Throwable { 1483 terminateAll(false); 1484 terminateAll(true); 1485 super.finalize(); 1486 } 1487 1488 /** Queue WorkerThread.TERMINATEs to all appropriate workers. */ terminateAll(boolean isPrefetch)1489 private void terminateAll(boolean isPrefetch) { 1490 synchronized(getWorkers(isPrefetch)) { 1491 int numThreads = isPrefetch ? 1492 numPrefetchThreads : numWorkerThreads; 1493 for(int i = 0; i < numThreads; i++) { 1494 addJob(WorkerThread.TERMINATE, isPrefetch); 1495 if(isPrefetch) { 1496 numPrefetchThreads--; 1497 } else { 1498 numWorkerThreads--; 1499 } 1500 } 1501 } 1502 } 1503 sendExceptionToListener(String message, Throwable e)1504 void sendExceptionToListener(String message, Throwable e) { 1505 ImagingListener listener = 1506 ImageUtil.getImagingListener((RenderingHints)null); 1507 listener.errorOccurred(message, e, this, false); 1508 } 1509 } 1510