1 /* 2 * File : ThreadPool.java 3 * Created : 21-Nov-2003 4 * By : parg 5 * 6 * Azureus - a Java Bittorrent client 7 * 8 * This program is free software; you can redistribute it and/or modify 9 * it under the terms of the GNU General Public License as published by 10 * the Free Software Foundation; either version 2 of the License, or 11 * (at your option) any later version. 12 * 13 * This program is distributed in the hope that it will be useful, 14 * but WITHOUT ANY WARRANTY; without even the implied warranty of 15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 * GNU General Public License for more details ( see the LICENSE file ). 17 * 18 * You should have received a copy of the GNU General Public License 19 * along with this program; if not, write to the Free Software 20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 21 */ 22 23 package org.gudy.azureus2.core3.util; 24 25 /** 26 * @author parg 27 * 28 */ 29 30 import java.util.ArrayList; 31 import java.util.Iterator; 32 import java.util.List; 33 34 import org.gudy.azureus2.core3.config.COConfigurationManager; 35 import org.gudy.azureus2.core3.config.ParameterListener; 36 37 38 public class 39 ThreadPool 40 { 41 private static final boolean NAME_THREADS = Constants.IS_CVS_VERSION && System.getProperty( "az.thread.pool.naming.enable", "true" ).equals( "true" ); 42 43 private static final boolean LOG_WARNINGS = false; 44 private static final int WARN_TIME = 10000; 45 46 private static List busy_pools = new ArrayList(); 47 private static boolean busy_pool_timer_set = false; 48 49 private static boolean debug_thread_pool; 50 private static boolean debug_thread_pool_log_on; 51 52 static{ 53 if ( System.getProperty("transitory.startup", "0").equals("0")){ 54 AEDiagnostics.addEvidenceGenerator( new AEDiagnosticsEvidenceGenerator() { public void generate( IndentWriter writer ) { writer.println( R ); try{ writer.indent(); List pools; synchronized( busy_pools ){ pools = new ArrayList( busy_pools ); } for (int i=0;i<pools.size();i++){ ((ThreadPool)pools.get(i)).generateEvidence( writer ); } }finally{ writer.exdent(); } } })55 AEDiagnostics.addEvidenceGenerator( 56 new AEDiagnosticsEvidenceGenerator() 57 { 58 public void 59 generate( 60 IndentWriter writer ) 61 { 62 writer.println( "Thread Pools" ); 63 64 try{ 65 writer.indent(); 66 67 List pools; 68 69 synchronized( busy_pools ){ 70 71 pools = new ArrayList( busy_pools ); 72 } 73 74 for (int i=0;i<pools.size();i++){ 75 76 ((ThreadPool)pools.get(i)).generateEvidence( writer ); 77 } 78 }finally{ 79 80 writer.exdent(); 81 } 82 } 83 }); 84 } 85 } 86 87 private static ThreadLocal tls = 88 new ThreadLocal() 89 { 90 public Object 91 initialValue() 92 { 93 return( null ); 94 } 95 }; 96 97 protected static void checkAllTimeouts()98 checkAllTimeouts() 99 { 100 List pools; 101 102 // copy the busy pools to avoid potential deadlock due to synchronization 103 // nestings 104 105 synchronized( busy_pools ){ 106 107 pools = new ArrayList( busy_pools ); 108 } 109 110 for (int i=0;i<pools.size();i++){ 111 112 ((ThreadPool)pools.get(i)).checkTimeouts(); 113 } 114 } 115 116 117 private String name; 118 private int max_size; 119 private int thread_name_index = 1; 120 121 private long execution_limit; 122 123 private List busy; 124 private boolean queue_when_full; 125 private List task_queue = new ArrayList(); 126 127 private AESemaphore thread_sem; 128 private int reserved_target; 129 private int reserved_actual; 130 131 private int thread_priority = Thread.NORM_PRIORITY; 132 private boolean warn_when_full; 133 134 private long task_total; 135 private long task_total_last; 136 private Average task_average = Average.getInstance( WARN_TIME, 120 ); 137 138 private boolean log_cpu = false || AEThread2.TRACE_TIMES; 139 140 public ThreadPool( String _name, int _max_size )141 ThreadPool( 142 String _name, 143 int _max_size ) 144 { 145 this( _name, _max_size, false ); 146 } 147 148 public ThreadPool( String _name, int _max_size, boolean _queue_when_full )149 ThreadPool( 150 String _name, 151 int _max_size, 152 boolean _queue_when_full ) 153 { 154 name = _name; 155 max_size = _max_size; 156 queue_when_full = _queue_when_full; 157 158 thread_sem = new AESemaphore( "ThreadPool::" + name, _max_size ); 159 160 busy = new ArrayList( _max_size ); 161 } 162 163 private void generateEvidence( IndentWriter writer )164 generateEvidence( 165 IndentWriter writer ) 166 { 167 writer.println( name + ": max=" + max_size +",qwf=" + queue_when_full + ",queue=" + task_queue.size() + ",busy=" + busy.size() + ",total=" + task_total + ":" + DisplayFormatters.formatDecimal(task_average.getDoubleAverage(),2) + "/sec"); 168 } 169 170 public void setWarnWhenFull()171 setWarnWhenFull() 172 { 173 warn_when_full = true; 174 } 175 176 public void setLogCPU()177 setLogCPU() 178 { 179 log_cpu = true; 180 } 181 182 public int getMaxThreads()183 getMaxThreads() 184 { 185 return( max_size ); 186 } 187 188 public void setThreadPriority( int _priority )189 setThreadPriority( 190 int _priority ) 191 { 192 thread_priority = _priority; 193 } 194 195 public void setExecutionLimit( long millis )196 setExecutionLimit( 197 long millis ) 198 { 199 synchronized( this ){ 200 201 execution_limit = millis; 202 } 203 } 204 run(AERunnable runnable)205 public threadPoolWorker run(AERunnable runnable) { 206 return( run(runnable, false, false)); 207 } 208 209 210 /** 211 * 212 * @param runnable 213 * @param high_priority 214 * inserts at front if tasks queueing 215 */ run(AERunnable runnable, boolean high_priority, boolean manualRelease)216 public threadPoolWorker run(AERunnable runnable, boolean high_priority, boolean manualRelease) { 217 218 if(manualRelease && !(runnable instanceof ThreadPoolTask)) 219 throw new IllegalArgumentException("manual release only allowed for ThreadPoolTasks"); 220 else if(manualRelease) 221 ((ThreadPoolTask)runnable).setManualRelease(); 222 223 // System.out.println( "Thread pool:" + name + " - sem = " + thread_sem.getValue() + ", queue = " + task_queue.size()); 224 225 // not queueing, grab synchronous sem here 226 227 if ( !queue_when_full ){ 228 229 if ( !thread_sem.reserveIfAvailable()){ 230 231 // defend against recursive entry when in queuing mode (yes, it happens) 232 233 threadPoolWorker recursive_worker = (threadPoolWorker)tls.get(); 234 235 if ( recursive_worker == null || recursive_worker.getOwner() != this ){ 236 237 // do a blocking reserve here, not recursive 238 239 checkWarning(); 240 241 thread_sem.reserve(); 242 243 }else{ 244 // run immediately 245 246 if ( runnable instanceof ThreadPoolTask ){ 247 248 ThreadPoolTask task = (ThreadPoolTask)runnable; 249 250 task.worker = recursive_worker; 251 252 try{ 253 task.taskStarted(); 254 255 runIt( runnable ); 256 257 task.join(); 258 259 }finally{ 260 261 task.taskCompleted(); 262 } 263 }else{ 264 265 runIt( runnable ); 266 } 267 268 return( recursive_worker ); 269 } 270 } 271 } 272 273 threadPoolWorker allocated_worker; 274 275 synchronized( this ){ 276 277 if ( high_priority ) 278 task_queue.add( 0, runnable ); 279 else 280 task_queue.add( runnable ); 281 282 // reserve if available is non-blocking 283 284 if ( queue_when_full && !thread_sem.reserveIfAvailable()){ 285 286 allocated_worker = null; 287 288 checkWarning(); 289 290 }else{ 291 292 allocated_worker = new threadPoolWorker(); 293 294 } 295 } 296 297 return( allocated_worker ); 298 } 299 300 protected void runIt( AERunnable runnable )301 runIt( 302 AERunnable runnable ) 303 { 304 if ( log_cpu ){ 305 306 long start_cpu = log_cpu?AEJavaManagement.getThreadCPUTime():0; 307 long start_time = SystemTime.getHighPrecisionCounter(); 308 309 runnable.run(); 310 311 if ( start_cpu > 0 ){ 312 313 long end_cpu = log_cpu?AEJavaManagement.getThreadCPUTime():0; 314 315 long diff_cpu = ( end_cpu - start_cpu ) / 1000000; 316 317 long end_time = SystemTime.getHighPrecisionCounter(); 318 319 long diff_millis = ( end_time - start_time ) / 1000000; 320 321 if ( diff_cpu > 10 || diff_millis > 10){ 322 323 System.out.println( TimeFormatter.milliStamp() + ": Thread: " + Thread.currentThread().getName() + ": " + runnable + " -> " + diff_cpu + "/" + diff_millis ); 324 } 325 } 326 }else{ 327 328 runnable.run(); 329 } 330 } 331 checkWarning()332 protected void checkWarning() { 333 if (warn_when_full) 334 { 335 String task_names = ""; 336 try 337 { 338 synchronized (ThreadPool.this) 339 { 340 for (int i = 0; i < busy.size(); i++) 341 { 342 threadPoolWorker x = (threadPoolWorker) busy.get(i); 343 AERunnable r = x.runnable; 344 if (r != null) 345 { 346 String name; 347 if (r instanceof ThreadPoolTask) 348 name = ((ThreadPoolTask) r).getName(); 349 else 350 name = r.getClass().getName(); 351 task_names += (task_names.length() == 0 ? "" : ",") + name; 352 } 353 } 354 } 355 } catch (Throwable e) 356 {} 357 Debug.out("Thread pool '" + getName() + "' is full (busy=" + task_names + ")"); 358 warn_when_full = false; 359 } 360 } 361 getQueuedTasks()362 public AERunnable[] getQueuedTasks() { 363 synchronized (this) 364 { 365 AERunnable[] res = new AERunnable[task_queue.size()]; 366 task_queue.toArray(res); 367 return (res); 368 } 369 } 370 getQueueSize()371 public int getQueueSize() { 372 synchronized (this) 373 { 374 return task_queue.size(); 375 } 376 } 377 isQueued(AERunnable task)378 public boolean isQueued(AERunnable task) { 379 synchronized (this) 380 { 381 return task_queue.contains(task); 382 } 383 } 384 385 public AERunnable[] getRunningTasks()386 getRunningTasks() 387 { 388 List runnables = new ArrayList(); 389 390 synchronized( this ){ 391 392 Iterator it = busy.iterator(); 393 394 while( it.hasNext()){ 395 396 threadPoolWorker worker = (threadPoolWorker)it.next(); 397 398 AERunnable runnable = worker.getRunnable(); 399 400 if ( runnable != null ){ 401 402 runnables.add( runnable ); 403 } 404 } 405 } 406 407 AERunnable[] res = new AERunnable[runnables.size()]; 408 409 runnables.toArray(res); 410 411 return( res ); 412 } 413 414 public int getRunningCount()415 getRunningCount() 416 { 417 int res = 0; 418 419 synchronized( this ){ 420 421 Iterator it = busy.iterator(); 422 423 while( it.hasNext()){ 424 425 threadPoolWorker worker = (threadPoolWorker)it.next(); 426 427 AERunnable runnable = worker.getRunnable(); 428 429 if ( runnable != null ){ 430 431 res++; 432 } 433 } 434 } 435 436 return( res ); 437 } 438 439 public boolean isFull()440 isFull() 441 { 442 return( thread_sem.getValue() == 0 ); 443 } 444 445 public void setMaxThreads( int max )446 setMaxThreads( 447 int max ) 448 { 449 if ( max > max_size ){ 450 451 Debug.out( "should support this sometime..." ); 452 453 return; 454 } 455 456 setReservedThreadCount( max_size - max ); 457 } 458 459 public void setReservedThreadCount( int res )460 setReservedThreadCount( 461 int res ) 462 { 463 synchronized( this ){ 464 465 if ( res < 0 ){ 466 467 res = 0; 468 469 }else if ( res > max_size ){ 470 471 res = max_size; 472 } 473 474 int diff = res - reserved_actual; 475 476 while( diff < 0 ){ 477 478 thread_sem.release(); 479 480 reserved_actual--; 481 482 diff++; 483 } 484 485 while( diff > 0 ){ 486 487 if ( thread_sem.reserveIfAvailable()){ 488 489 reserved_actual++; 490 491 diff--; 492 493 }else{ 494 495 break; 496 } 497 } 498 499 reserved_target = res; 500 } 501 } 502 503 protected void checkTimeouts()504 checkTimeouts() 505 { 506 synchronized( this ){ 507 508 long diff = task_total - task_total_last; 509 510 task_average.addValue( diff ); 511 512 task_total_last = task_total; 513 514 if ( debug_thread_pool_log_on ){ 515 516 System.out.println( "ThreadPool '" + getName() + "'/" + thread_name_index + ": max=" + max_size + ",sem=[" + thread_sem.getString() + "],busy=" + busy.size() + ",queue=" + task_queue.size()); 517 } 518 519 long now = SystemTime.getMonotonousTime(); 520 521 for (int i=0;i<busy.size();i++){ 522 523 threadPoolWorker x = (threadPoolWorker)busy.get(i); 524 525 long elapsed = now - x.run_start_time; 526 527 if ( elapsed > ( (long)WARN_TIME * (x.warn_count+1))){ 528 529 x.warn_count++; 530 531 if ( LOG_WARNINGS ){ 532 533 DebugLight.out( x.getWorkerName() + ": running, elapsed = " + elapsed + ", state = " + x.state ); 534 } 535 536 if ( execution_limit > 0 && elapsed > execution_limit ){ 537 538 if ( LOG_WARNINGS ){ 539 540 DebugLight.out( x.getWorkerName() + ": interrupting" ); 541 } 542 543 AERunnable r = x.runnable; 544 545 if ( r != null ){ 546 547 try{ 548 if ( r instanceof ThreadPoolTask ){ 549 550 ((ThreadPoolTask)r).interruptTask(); 551 552 }else{ 553 554 x.interrupt(); 555 } 556 }catch( Throwable e ){ 557 558 DebugLight.printStackTrace( e ); 559 } 560 } 561 } 562 } 563 } 564 } 565 } 566 getName()567 public String getName() { 568 return (name); 569 } 570 releaseManual(ThreadPoolTask toRelease)571 void releaseManual(ThreadPoolTask toRelease) { 572 if( !toRelease.canManualRelease()){ 573 throw new IllegalStateException("task not manually releasable"); 574 } 575 576 synchronized( this ){ 577 578 long elapsed = SystemTime.getMonotonousTime() - toRelease.worker.run_start_time; 579 if (elapsed > WARN_TIME && LOG_WARNINGS) 580 DebugLight.out(toRelease.worker.getWorkerName() + ": terminated, elapsed = " + elapsed + ", state = " + toRelease.worker.state); 581 582 if ( !busy.remove(toRelease.worker)){ 583 584 throw new IllegalStateException("task already released"); 585 } 586 587 // if debug is on we leave the pool registered so that we 588 // can trace on the timeout events 589 590 if (busy.size() == 0 && !debug_thread_pool){ 591 592 synchronized (busy_pools){ 593 594 busy_pools.remove(this); 595 } 596 } 597 598 if ( busy.size() == 0){ 599 600 if ( reserved_target > reserved_actual ){ 601 602 reserved_actual++; 603 604 }else{ 605 606 thread_sem.release(); 607 } 608 }else{ 609 610 new threadPoolWorker(); 611 } 612 } 613 614 } 615 registerThreadAsChild(threadPoolWorker parent)616 public void registerThreadAsChild(threadPoolWorker parent) 617 { 618 if(tls.get() == null || tls.get() == parent) 619 tls.set(parent); 620 else 621 throw new IllegalStateException("another parent is already set for this thread"); 622 } 623 deregisterThreadAsChild(threadPoolWorker parent)624 public void deregisterThreadAsChild(threadPoolWorker parent) 625 { 626 if(tls.get() == parent) 627 tls.set(null); 628 else 629 throw new IllegalStateException("tls is not set to parent"); 630 } 631 632 633 class threadPoolWorker extends AEThread2 { 634 private final String worker_name; 635 private volatile AERunnable runnable; 636 private long run_start_time; 637 private int warn_count; 638 private String state = "<none>"; 639 threadPoolWorker()640 protected threadPoolWorker() 641 { 642 super(NAME_THREADS?(name + " " + (thread_name_index)):name,true); 643 thread_name_index++; 644 setPriority(thread_priority); 645 worker_name = this.getName(); 646 start(); 647 } 648 run()649 public void run() { 650 tls.set(threadPoolWorker.this); 651 652 boolean autoRelease = true; 653 654 try 655 { 656 do 657 { 658 try 659 { 660 synchronized (ThreadPool.this) 661 { 662 if (task_queue.size() > 0) 663 runnable = (AERunnable) task_queue.remove(0); 664 else 665 break; 666 } 667 668 synchronized (ThreadPool.this) 669 { 670 run_start_time = SystemTime.getMonotonousTime(); 671 warn_count = 0; 672 busy.add(threadPoolWorker.this); 673 task_total++; 674 if (busy.size() == 1) 675 { 676 synchronized (busy_pools) 677 { 678 if (!busy_pools.contains(ThreadPool.this)) 679 { 680 busy_pools.add(ThreadPool.this); 681 if (!busy_pool_timer_set) 682 { 683 // we have to defer this action rather 684 // than running as a static initialiser 685 // due to the dependency between 686 // ThreadPool, Timer and ThreadPool again 687 COConfigurationManager.addAndFireParameterListeners(new String[] { "debug.threadpool.log.enable", "debug.threadpool.debug.trace" }, new ParameterListener() 688 { 689 public void parameterChanged(String name) { 690 debug_thread_pool = COConfigurationManager.getBooleanParameter("debug.threadpool.log.enable", false); 691 debug_thread_pool_log_on = COConfigurationManager.getBooleanParameter("debug.threadpool.debug.trace", false); 692 } 693 }); 694 busy_pool_timer_set = true; 695 SimpleTimer.addPeriodicEvent("ThreadPool:timeout", WARN_TIME, new TimerEventPerformer() 696 { 697 public void perform(TimerEvent event) { 698 checkAllTimeouts(); 699 } 700 }); 701 } 702 } 703 } 704 } 705 } 706 707 if (runnable instanceof ThreadPoolTask) 708 { 709 ThreadPoolTask tpt = (ThreadPoolTask) runnable; 710 tpt.worker = this; 711 String task_name = NAME_THREADS?tpt.getName():null; 712 try 713 { 714 if (task_name != null) 715 setName(worker_name + "{" + task_name + "}"); 716 tpt.taskStarted(); 717 runIt(runnable); 718 } finally 719 { 720 if (task_name != null) 721 setName(worker_name); 722 723 if(tpt.isAutoReleaseAndAllowManual()) 724 tpt.taskCompleted(); 725 else 726 { 727 autoRelease = false; 728 break; 729 } 730 731 } 732 } else 733 runIt(runnable); 734 735 } catch (Throwable e) 736 { 737 DebugLight.printStackTrace(e); 738 } finally 739 { 740 if(autoRelease) 741 { 742 synchronized (ThreadPool.this) 743 { 744 long elapsed = SystemTime.getMonotonousTime() - run_start_time; 745 if (elapsed > WARN_TIME && LOG_WARNINGS) 746 DebugLight.out(getWorkerName() + ": terminated, elapsed = " + elapsed + ", state = " + state); 747 748 busy.remove(threadPoolWorker.this); 749 750 // if debug is on we leave the pool registered so that we 751 // can trace on the timeout events 752 if (busy.size() == 0 && !debug_thread_pool) 753 synchronized (busy_pools) 754 { 755 busy_pools.remove(ThreadPool.this); 756 } 757 } 758 } 759 } 760 } while (runnable != null); 761 } catch (Throwable e) 762 { 763 DebugLight.printStackTrace(e); 764 } finally 765 { 766 if ( autoRelease){ 767 768 synchronized (ThreadPool.this){ 769 770 if ( reserved_target > reserved_actual ){ 771 772 reserved_actual++; 773 774 }else{ 775 776 thread_sem.release(); 777 } 778 } 779 } 780 781 tls.set(null); 782 } 783 } 784 setState(String _state)785 public void setState(String _state) { 786 //System.out.println( "state = " + _state ); 787 state = _state; 788 } 789 getState()790 public String getState() { 791 return (state); 792 } 793 getWorkerName()794 protected String getWorkerName() { 795 return (worker_name); 796 } 797 getOwner()798 protected ThreadPool getOwner() { 799 return (ThreadPool.this); 800 } 801 getRunnable()802 protected AERunnable getRunnable() { 803 return (runnable); 804 } 805 } 806 } 807