1 /*
2  * File    : ThreadPool.java
3  * Created : 21-Nov-2003
4  * By      : parg
5  *
6  * Azureus - a Java Bittorrent client
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details ( see the LICENSE file ).
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21  */
22 
23 package org.gudy.azureus2.core3.util;
24 
25 /**
26  * @author parg
27  *
28  */
29 
30 import java.util.ArrayList;
31 import java.util.Iterator;
32 import java.util.List;
33 
34 import org.gudy.azureus2.core3.config.COConfigurationManager;
35 import org.gudy.azureus2.core3.config.ParameterListener;
36 
37 
38 public class
39 ThreadPool
40 {
41 	private static final boolean	NAME_THREADS = Constants.IS_CVS_VERSION && System.getProperty( "az.thread.pool.naming.enable", "true" ).equals( "true" );
42 
43 	private static final boolean	LOG_WARNINGS	= false;
44 	private static final int		WARN_TIME		= 10000;
45 
46 	private static List		busy_pools			= new ArrayList();
47 	private static boolean	busy_pool_timer_set	= false;
48 
49 	private static boolean	debug_thread_pool;
50 	private static boolean	debug_thread_pool_log_on;
51 
52 	static{
53 		if ( System.getProperty("transitory.startup", "0").equals("0")){
54 
AEDiagnostics.addEvidenceGenerator( new AEDiagnosticsEvidenceGenerator() { public void generate( IndentWriter writer ) { writer.println( R ); try{ writer.indent(); List pools; synchronized( busy_pools ){ pools = new ArrayList( busy_pools ); } for (int i=0;i<pools.size();i++){ ((ThreadPool)pools.get(i)).generateEvidence( writer ); } }finally{ writer.exdent(); } } })55 			AEDiagnostics.addEvidenceGenerator(
56 				new AEDiagnosticsEvidenceGenerator()
57 				{
58 					public void
59 					generate(
60 						IndentWriter		writer )
61 					{
62 						writer.println( "Thread Pools" );
63 
64 						try{
65 							writer.indent();
66 
67 							List	pools;
68 
69 							synchronized( busy_pools ){
70 
71 								pools	= new ArrayList( busy_pools );
72 							}
73 
74 							for (int i=0;i<pools.size();i++){
75 
76 								((ThreadPool)pools.get(i)).generateEvidence( writer );
77 							}
78 						}finally{
79 
80 							writer.exdent();
81 						}
82 					}
83 				});
84 		}
85 	}
86 
87 	private static ThreadLocal		tls	=
88 		new ThreadLocal()
89 		{
90 			public Object
91 			initialValue()
92 			{
93 				return( null );
94 			}
95 		};
96 
97 	protected static void
checkAllTimeouts()98 	checkAllTimeouts()
99 	{
100 		List	pools;
101 
102 			// copy the busy pools to avoid potential deadlock due to synchronization
103 			// nestings
104 
105 		synchronized( busy_pools ){
106 
107 			pools	= new ArrayList( busy_pools );
108 		}
109 
110 		for (int i=0;i<pools.size();i++){
111 
112 			((ThreadPool)pools.get(i)).checkTimeouts();
113 		}
114 	}
115 
116 
117 	private String	name;
118 	private int		max_size;
119 	private int		thread_name_index	= 1;
120 
121 	private long	execution_limit;
122 
123 	private List	busy;
124 	private boolean	queue_when_full;
125 	private List	task_queue	= new ArrayList();
126 
127 	private AESemaphore		thread_sem;
128 	private int				reserved_target;
129 	private int				reserved_actual;
130 
131 	private int			thread_priority	= Thread.NORM_PRIORITY;
132 	private boolean		warn_when_full;
133 
134 	private long		task_total;
135 	private long		task_total_last;
136 	private Average		task_average	= Average.getInstance( WARN_TIME, 120 );
137 
138 	private boolean		log_cpu	= false || AEThread2.TRACE_TIMES;
139 
140 	public
ThreadPool( String _name, int _max_size )141 	ThreadPool(
142 		String	_name,
143 		int		_max_size )
144 	{
145 		this( _name, _max_size, false );
146 	}
147 
148 	public
ThreadPool( String _name, int _max_size, boolean _queue_when_full )149 	ThreadPool(
150 		String	_name,
151 		int		_max_size,
152 		boolean	_queue_when_full )
153 	{
154 		name			= _name;
155 		max_size		= _max_size;
156 		queue_when_full	= _queue_when_full;
157 
158 		thread_sem = new AESemaphore( "ThreadPool::" + name, _max_size );
159 
160 		busy		= new ArrayList( _max_size );
161 	}
162 
163 	private void
generateEvidence( IndentWriter writer )164 	generateEvidence(
165 		IndentWriter		writer )
166 	{
167 		writer.println( name + ": max=" + max_size +",qwf=" + queue_when_full + ",queue=" + task_queue.size() + ",busy=" + busy.size() + ",total=" + task_total + ":" + DisplayFormatters.formatDecimal(task_average.getDoubleAverage(),2) + "/sec");
168 	}
169 
170 	public void
setWarnWhenFull()171 	setWarnWhenFull()
172 	{
173 		warn_when_full	= true;
174 	}
175 
176 	public void
setLogCPU()177 	setLogCPU()
178 	{
179 		log_cpu	= true;
180 	}
181 
182 	public int
getMaxThreads()183 	getMaxThreads()
184 	{
185 		return( max_size );
186 	}
187 
188 	public void
setThreadPriority( int _priority )189 	setThreadPriority(
190 		int	_priority )
191 	{
192 		thread_priority	= _priority;
193 	}
194 
195 	public void
setExecutionLimit( long millis )196 	setExecutionLimit(
197 		long		millis )
198 	{
199 		synchronized( this ){
200 
201 			execution_limit	= millis;
202 		}
203 	}
204 
run(AERunnable runnable)205 	public threadPoolWorker run(AERunnable runnable) {
206 		return( run(runnable, false, false));
207 	}
208 
209 
210 	/**
211 	 *
212 	 * @param runnable
213 	 * @param high_priority
214 	 *            inserts at front if tasks queueing
215 	 */
run(AERunnable runnable, boolean high_priority, boolean manualRelease)216 	public threadPoolWorker run(AERunnable runnable, boolean high_priority, boolean manualRelease) {
217 
218 		if(manualRelease && !(runnable instanceof ThreadPoolTask))
219 			throw new IllegalArgumentException("manual release only allowed for ThreadPoolTasks");
220 		else if(manualRelease)
221 			((ThreadPoolTask)runnable).setManualRelease();
222 
223 		// System.out.println( "Thread pool:" + name + " - sem = " + thread_sem.getValue() + ", queue = " + task_queue.size());
224 
225 			// not queueing, grab synchronous sem here
226 
227 		if ( !queue_when_full ){
228 
229 			if ( !thread_sem.reserveIfAvailable()){
230 
231 					// defend against recursive entry when in queuing mode (yes, it happens)
232 
233 				threadPoolWorker	recursive_worker = (threadPoolWorker)tls.get();
234 
235 				if ( recursive_worker == null || recursive_worker.getOwner() != this ){
236 
237 						// do a blocking reserve here, not recursive
238 
239 					checkWarning();
240 
241 					thread_sem.reserve();
242 
243 				}else{
244 						// run immediately
245 
246 					if ( runnable instanceof ThreadPoolTask ){
247 
248 						ThreadPoolTask task = (ThreadPoolTask)runnable;
249 
250 						task.worker = recursive_worker;
251 
252 						try{
253 							task.taskStarted();
254 
255 							runIt( runnable );
256 
257 							task.join();
258 
259 						}finally{
260 
261 							task.taskCompleted();
262 						}
263 					}else{
264 
265 						runIt( runnable );
266 					}
267 
268 					return( recursive_worker );
269 				}
270 			}
271 		}
272 
273 		threadPoolWorker allocated_worker;
274 
275 		synchronized( this ){
276 
277 			if ( high_priority )
278 				task_queue.add( 0, runnable );
279 			else
280 				task_queue.add( runnable );
281 
282 			// reserve if available is non-blocking
283 
284 			if ( queue_when_full && !thread_sem.reserveIfAvailable()){
285 
286 				allocated_worker	= null;
287 
288 				checkWarning();
289 
290 			}else{
291 
292 				allocated_worker = new threadPoolWorker();
293 
294 			}
295 		}
296 
297 		return( allocated_worker );
298 	}
299 
300 	protected void
runIt( AERunnable runnable )301 	runIt(
302 		AERunnable	runnable )
303 	{
304 		if ( log_cpu ){
305 
306 			long	start_cpu = log_cpu?AEJavaManagement.getThreadCPUTime():0;
307 			long	start_time	= SystemTime.getHighPrecisionCounter();
308 
309 			runnable.run();
310 
311 			if ( start_cpu > 0 ){
312 
313 				long	end_cpu = log_cpu?AEJavaManagement.getThreadCPUTime():0;
314 
315 				long	diff_cpu = ( end_cpu - start_cpu ) / 1000000;
316 
317 				long	end_time	= SystemTime.getHighPrecisionCounter();
318 
319 				long	diff_millis = ( end_time - start_time ) / 1000000;
320 
321 				if ( diff_cpu > 10 || diff_millis > 10){
322 
323 					System.out.println( TimeFormatter.milliStamp() + ": Thread: " + Thread.currentThread().getName() + ": " + runnable + " -> " + diff_cpu + "/" + diff_millis );
324 				}
325 			}
326 		}else{
327 
328 			runnable.run();
329 		}
330 	}
331 
checkWarning()332 	protected void checkWarning() {
333 		if (warn_when_full)
334 		{
335 			String task_names = "";
336 			try
337 			{
338 				synchronized (ThreadPool.this)
339 				{
340 					for (int i = 0; i < busy.size(); i++)
341 					{
342 						threadPoolWorker x = (threadPoolWorker) busy.get(i);
343 						AERunnable r = x.runnable;
344 						if (r != null)
345 						{
346 							String name;
347 							if (r instanceof ThreadPoolTask)
348 								name = ((ThreadPoolTask) r).getName();
349 							else
350 								name = r.getClass().getName();
351 							task_names += (task_names.length() == 0 ? "" : ",") + name;
352 						}
353 					}
354 				}
355 			} catch (Throwable e)
356 			{}
357 			Debug.out("Thread pool '" + getName() + "' is full (busy=" + task_names + ")");
358 			warn_when_full = false;
359 		}
360 	}
361 
getQueuedTasks()362 	public AERunnable[] getQueuedTasks() {
363 		synchronized (this)
364 		{
365 			AERunnable[] res = new AERunnable[task_queue.size()];
366 			task_queue.toArray(res);
367 			return (res);
368 		}
369 	}
370 
getQueueSize()371 	public int getQueueSize() {
372 		synchronized (this)
373 		{
374 			return task_queue.size();
375 		}
376 	}
377 
isQueued(AERunnable task)378 	public boolean isQueued(AERunnable task) {
379 		synchronized (this)
380 		{
381 			return task_queue.contains(task);
382 		}
383 	}
384 
385 	public AERunnable[]
getRunningTasks()386 	getRunningTasks()
387 	{
388 		List	runnables	= new ArrayList();
389 
390 		synchronized( this ){
391 
392 			Iterator	it = busy.iterator();
393 
394 			while( it.hasNext()){
395 
396 				threadPoolWorker	worker = (threadPoolWorker)it.next();
397 
398 				AERunnable	runnable = worker.getRunnable();
399 
400 				if ( runnable != null ){
401 
402 					runnables.add( runnable );
403 				}
404 			}
405 		}
406 
407 		AERunnable[]	res = new AERunnable[runnables.size()];
408 
409 		runnables.toArray(res);
410 
411 		return( res );
412 	}
413 
414 	public int
getRunningCount()415   	getRunningCount()
416   	{
417   		int	res = 0;
418 
419   		synchronized( this ){
420 
421   			Iterator	it = busy.iterator();
422 
423   			while( it.hasNext()){
424 
425   				threadPoolWorker	worker = (threadPoolWorker)it.next();
426 
427   				AERunnable	runnable = worker.getRunnable();
428 
429   				if ( runnable != null ){
430 
431   					res++;
432   				}
433   			}
434   		}
435 
436   		return( res );
437   	}
438 
439 	public boolean
isFull()440 	isFull()
441 	{
442 		return( thread_sem.getValue() == 0 );
443 	}
444 
445 	public void
setMaxThreads( int max )446 	setMaxThreads(
447 		int		max )
448 	{
449 		if ( max > max_size ){
450 
451 			Debug.out( "should support this sometime..." );
452 
453 			return;
454 		}
455 
456 		setReservedThreadCount( max_size - max );
457 	}
458 
459 	public void
setReservedThreadCount( int res )460 	setReservedThreadCount(
461 		int		res )
462 	{
463 		synchronized( this ){
464 
465 			if ( res < 0 ){
466 
467 				res = 0;
468 
469 			}else if ( res > max_size ){
470 
471 				res = max_size;
472 			}
473 
474 			int	 diff =  res - reserved_actual;
475 
476 			while( diff < 0 ){
477 
478 				thread_sem.release();
479 
480 				reserved_actual--;
481 
482 				diff++;
483 			}
484 
485 			while( diff > 0 ){
486 
487 				if ( thread_sem.reserveIfAvailable()){
488 
489 					reserved_actual++;
490 
491 					diff--;
492 
493 				}else{
494 
495 					break;
496 				}
497 			}
498 
499 			reserved_target = res;
500 		}
501 	}
502 
503 	protected void
checkTimeouts()504 	checkTimeouts()
505 	{
506 		synchronized( this ){
507 
508 			long	diff = task_total - task_total_last;
509 
510 			task_average.addValue( diff );
511 
512 			task_total_last = task_total;
513 
514 			if ( debug_thread_pool_log_on ){
515 
516 				System.out.println( "ThreadPool '" + getName() + "'/" + thread_name_index + ": max=" + max_size + ",sem=[" + thread_sem.getString() + "],busy=" + busy.size() + ",queue=" + task_queue.size());
517 			}
518 
519 			long	now = SystemTime.getMonotonousTime();
520 
521 			for (int i=0;i<busy.size();i++){
522 
523 				threadPoolWorker	x = (threadPoolWorker)busy.get(i);
524 
525 				long	elapsed = now - x.run_start_time;
526 
527 				if ( elapsed > ( (long)WARN_TIME * (x.warn_count+1))){
528 
529 					x.warn_count++;
530 
531 					if ( LOG_WARNINGS ){
532 
533 						DebugLight.out( x.getWorkerName() + ": running, elapsed = " + elapsed + ", state = " + x.state );
534 					}
535 
536 					if ( execution_limit > 0 && elapsed > execution_limit ){
537 
538 						if ( LOG_WARNINGS ){
539 
540 							DebugLight.out( x.getWorkerName() + ": interrupting" );
541 						}
542 
543 						AERunnable r = x.runnable;
544 
545 						if ( r != null ){
546 
547 							try{
548 								if ( r instanceof ThreadPoolTask ){
549 
550 									((ThreadPoolTask)r).interruptTask();
551 
552 								}else{
553 
554 									x.interrupt();
555 								}
556 							}catch( Throwable e ){
557 
558 								DebugLight.printStackTrace( e );
559 							}
560 						}
561 					}
562 				}
563 			}
564 		}
565 	}
566 
getName()567 	public String getName() {
568 		return (name);
569 	}
570 
releaseManual(ThreadPoolTask toRelease)571 	void releaseManual(ThreadPoolTask toRelease) {
572 		if( !toRelease.canManualRelease()){
573 			throw new IllegalStateException("task not manually releasable");
574 		}
575 
576 		synchronized( this ){
577 
578 			long elapsed = SystemTime.getMonotonousTime() - toRelease.worker.run_start_time;
579 			if (elapsed > WARN_TIME && LOG_WARNINGS)
580 				DebugLight.out(toRelease.worker.getWorkerName() + ": terminated, elapsed = " + elapsed + ", state = " + toRelease.worker.state);
581 
582 			if ( !busy.remove(toRelease.worker)){
583 
584 				throw new IllegalStateException("task already released");
585 			}
586 
587 			// if debug is on we leave the pool registered so that we
588 			// can trace on the timeout events
589 
590 			if (busy.size() == 0 && !debug_thread_pool){
591 
592 				synchronized (busy_pools){
593 
594 					busy_pools.remove(this);
595 				}
596 			}
597 
598 			if ( busy.size() == 0){
599 
600 				if ( reserved_target > reserved_actual ){
601 
602 					reserved_actual++;
603 
604 				}else{
605 
606 					thread_sem.release();
607 				}
608 			}else{
609 
610 				new threadPoolWorker();
611 			}
612 		}
613 
614 	}
615 
registerThreadAsChild(threadPoolWorker parent)616 	public void registerThreadAsChild(threadPoolWorker parent)
617 	{
618 		if(tls.get() == null || tls.get() == parent)
619 			tls.set(parent);
620 		else
621 			throw new IllegalStateException("another parent is already set for this thread");
622 	}
623 
deregisterThreadAsChild(threadPoolWorker parent)624 	public void deregisterThreadAsChild(threadPoolWorker parent)
625 	{
626 		if(tls.get() == parent)
627 			tls.set(null);
628 		else
629 			throw new IllegalStateException("tls is not set to parent");
630 	}
631 
632 
633 	class threadPoolWorker extends AEThread2 {
634 		private final String		worker_name;
635 		private volatile AERunnable	runnable;
636 		private long				run_start_time;
637 		private int					warn_count;
638 		private String				state	= "<none>";
639 
threadPoolWorker()640 		protected threadPoolWorker()
641 		{
642 			super(NAME_THREADS?(name + " " + (thread_name_index)):name,true);
643 			thread_name_index++;
644 			setPriority(thread_priority);
645 			worker_name = this.getName();
646 			start();
647 		}
648 
run()649 		public void run() {
650 			tls.set(threadPoolWorker.this);
651 
652 			boolean autoRelease = true;
653 
654 			try
655 			{
656 				do
657 				{
658 					try
659 					{
660 						synchronized (ThreadPool.this)
661 						{
662 							if (task_queue.size() > 0)
663 								runnable = (AERunnable) task_queue.remove(0);
664 							else
665 								break;
666 						}
667 
668 						synchronized (ThreadPool.this)
669 						{
670 							run_start_time = SystemTime.getMonotonousTime();
671 							warn_count = 0;
672 							busy.add(threadPoolWorker.this);
673 							task_total++;
674 							if (busy.size() == 1)
675 							{
676 								synchronized (busy_pools)
677 								{
678 									if (!busy_pools.contains(ThreadPool.this))
679 									{
680 										busy_pools.add(ThreadPool.this);
681 										if (!busy_pool_timer_set)
682 										{
683 											// we have to defer this action rather
684 											// than running as a static initialiser
685 											// due to the dependency between
686 											// ThreadPool, Timer and ThreadPool again
687 											COConfigurationManager.addAndFireParameterListeners(new String[] { "debug.threadpool.log.enable", "debug.threadpool.debug.trace" }, new ParameterListener()
688 											{
689 												public void parameterChanged(String name) {
690 													debug_thread_pool = COConfigurationManager.getBooleanParameter("debug.threadpool.log.enable", false);
691 													debug_thread_pool_log_on = COConfigurationManager.getBooleanParameter("debug.threadpool.debug.trace", false);
692 												}
693 											});
694 											busy_pool_timer_set = true;
695 											SimpleTimer.addPeriodicEvent("ThreadPool:timeout", WARN_TIME, new TimerEventPerformer()
696 											{
697 												public void perform(TimerEvent event) {
698 													checkAllTimeouts();
699 												}
700 											});
701 										}
702 									}
703 								}
704 							}
705 						}
706 
707 						if (runnable instanceof ThreadPoolTask)
708 						{
709 							ThreadPoolTask tpt = (ThreadPoolTask) runnable;
710 							tpt.worker = this;
711 							String task_name = NAME_THREADS?tpt.getName():null;
712 							try
713 							{
714 								if (task_name != null)
715 									setName(worker_name + "{" + task_name + "}");
716 								tpt.taskStarted();
717 								runIt(runnable);
718 							} finally
719 							{
720 								if (task_name != null)
721 									setName(worker_name);
722 
723 								if(tpt.isAutoReleaseAndAllowManual())
724 									tpt.taskCompleted();
725 								else
726 								{
727 									autoRelease = false;
728 									break;
729 								}
730 
731 							}
732 						} else
733 							runIt(runnable);
734 
735 					} catch (Throwable e)
736 					{
737 						DebugLight.printStackTrace(e);
738 					} finally
739 					{
740 						if(autoRelease)
741 						{
742 							synchronized (ThreadPool.this)
743 							{
744 								long elapsed = SystemTime.getMonotonousTime() - run_start_time;
745 								if (elapsed > WARN_TIME && LOG_WARNINGS)
746 									DebugLight.out(getWorkerName() + ": terminated, elapsed = " + elapsed + ", state = " + state);
747 
748 								busy.remove(threadPoolWorker.this);
749 
750 								// if debug is on we leave the pool registered so that we
751 								// can trace on the timeout events
752 								if (busy.size() == 0 && !debug_thread_pool)
753 									synchronized (busy_pools)
754 									{
755 										busy_pools.remove(ThreadPool.this);
756 									}
757 							}
758 						}
759 					}
760 				} while (runnable != null);
761 			} catch (Throwable e)
762 			{
763 				DebugLight.printStackTrace(e);
764 			} finally
765 			{
766 				if ( autoRelease){
767 
768 					synchronized (ThreadPool.this){
769 
770 						if ( reserved_target > reserved_actual ){
771 
772 							reserved_actual++;
773 
774 						}else{
775 
776 							thread_sem.release();
777 						}
778 					}
779 				}
780 
781 				tls.set(null);
782 			}
783 		}
784 
setState(String _state)785 		public void setState(String _state) {
786 			//System.out.println( "state = " + _state );
787 			state = _state;
788 		}
789 
getState()790 		public String getState() {
791 			return (state);
792 		}
793 
getWorkerName()794 		protected String getWorkerName() {
795 			return (worker_name);
796 		}
797 
getOwner()798 		protected ThreadPool getOwner() {
799 			return (ThreadPool.this);
800 		}
801 
getRunnable()802 		protected AERunnable getRunnable() {
803 			return (runnable);
804 		}
805 	}
806 }
807