1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import seda.sandStorm.core.*;
30 import seda.sandStorm.main.*;
31 import java.util.*;
32 
33 /**
34  * ThreadPool is a generic class which provides a thread pool.
35  *
36  * @author   Matt Welsh
37  */
38 
39 public class ThreadPool implements ProfilableIF {
40 
41   private static final boolean DEBUG = false;
42 
43   private StageWrapperIF stage;
44   private ManagerIF mgr;
45   private String poolname;
46   private ThreadGroup pooltg;
47   private Runnable runnable;
48   private Vector threads, stoppedThreads;
49 
50   int minThreads, maxThreads;
51 
52   private int maxAggregation;
53   private int blockTime = 1000;
54   private int idleTimeThreshold;
55   private AggThrottle aggThrottle;
56 
57   /**
58    * Create a thread pool for the given stage, manager and runnable,
59    * with the thread pool controller determining the number of threads
60    * used.
61    */
ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable)62   public ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable) {
63     this.stage = stage;
64     this.poolname = stage.getStage().getName();
65     this.mgr = mgr;
66     this.runnable = runnable;
67 
68     SandstormConfig config = mgr.getConfig();
69     if (config.getBoolean("global.batchController.enable")) {
70       aggThrottle = new AggThrottle(stage, mgr);
71     } else {
72       this.maxAggregation = config.getInt("global.batchController.maxBatch");
73     }
74 
75     threads = new Vector();
76     stoppedThreads = new Vector();
77 
78     // First look for stages.[stageName] options, then global options
79     String tag = "stages."+(stage.getStage().getName())+".threadPool.";
80     String globaltag = "global.threadPool.";
81 
82     int initialSize = config.getInt(tag+"initialThreads");
83     if (initialSize < 1) {
84       initialSize = config.getInt(globaltag+"initialThreads");
85       if (initialSize < 1) initialSize = 1;
86     }
87     minThreads = config.getInt(tag+"minThreads");
88     if (minThreads < 1) {
89       minThreads = config.getInt(globaltag+"minThreads");
90       if (minThreads < 1) minThreads = 1;
91     }
92     maxThreads = config.getInt(tag+"maxThreads");
93     if (maxThreads < 1) {
94       maxThreads = config.getInt(globaltag+"maxThreads");
95       if (maxThreads < 1) maxThreads = -1; // Infinite
96     }
97 
98     this.blockTime = config.getInt(tag+"blockTime",
99 	config.getInt(globaltag+"blockTime", blockTime));
100     this.idleTimeThreshold = config.getInt(tag+"sizeController.idleTimeThreshold",
101 	config.getInt(globaltag+"sizeController.idleTimeThreshold", blockTime));
102 
103     System.err.println("TP <"+poolname+">: initial "+initialSize+", min "+minThreads+", max "+maxThreads+", blockTime "+blockTime+", idleTime "+idleTimeThreshold);
104 
105     addThreads(initialSize, false);
106     mgr.getProfiler().add("ThreadPool <"+poolname+">", this);
107     pooltg = new ThreadGroup("TP <"+poolname+">");
108   }
109 
110   /**
111    * Create a thread pool with the given name, manager, runnable,
112    * and thread sizing parameters.
113    */
ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable, int initialThreads, int minThreads, int maxThreads, int blockTime, int idleTimeThreshold)114   public ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable,
115       int initialThreads, int minThreads, int maxThreads, int blockTime, int idleTimeThreshold) {
116     this.stage = stage;
117     this.poolname = stage.getStage().getName();
118     this.mgr = mgr;
119     this.runnable = runnable;
120 
121     SandstormConfig config = mgr.getConfig();
122     if (config.getBoolean("global.batchController.enable")) {
123       aggThrottle = new AggThrottle(stage, mgr);
124     } else {
125       this.maxAggregation = config.getInt("global.batchController.maxBatch");
126     }
127 
128     threads = new Vector();
129     stoppedThreads = new Vector();
130     if (initialThreads < 1) initialThreads = 1;
131     this.minThreads = minThreads;
132     if (this.minThreads < 1) this.minThreads = 1;
133     this.maxThreads = maxThreads;
134     //if (this.maxThreads < 1) this.maxThreads = initialThreads;
135     this.blockTime = blockTime;
136     this.idleTimeThreshold = idleTimeThreshold;
137 
138     addThreads(initialThreads, false);
139     mgr.getProfiler().add("ThreadPool <"+poolname+">", this);
140     pooltg = new ThreadGroup("TP <"+poolname+">");
141   }
142 
143   /**
144    * Create a thread pool with the given name, manager, runnable,
145    * and a fixed number of threads.
146    */
ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable, int numThreads)147   public ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable,
148       int numThreads) {
149     this.stage = stage;
150     this.poolname = stage.getStage().getName();
151     this.mgr = mgr;
152     this.runnable = runnable;
153 
154     SandstormConfig config = mgr.getConfig();
155     if (config.getBoolean("global.batchController.enable")) {
156       aggThrottle = new AggThrottle(stage, mgr);
157     } else {
158       this.maxAggregation = config.getInt("global.batchController.maxBatch");
159     }
160 
161     threads = new Vector();
162     stoppedThreads = new Vector();
163     maxThreads = minThreads = numThreads;
164     addThreads(numThreads, false);
165     mgr.getProfiler().add("ThreadPool <"+poolname+">", this);
166     pooltg = new ThreadGroup("TP <"+poolname+">");
167   }
168 
169   /**
170    * Start the thread pool.
171    */
start()172   public void start() {
173     System.err.print("TP <"+poolname+">: Starting "+numThreads()+" threads");
174     if (aggThrottle != null) {
175       System.err.println(", batchController enabled");
176     } else {
177       System.err.println(", maxBatch="+maxAggregation);
178     }
179     for (int i = 0; i < threads.size(); i++) {
180       Thread t = (Thread)threads.elementAt(i);
181       t.start();
182     }
183   }
184 
185   /**
186    * Stop the thread pool.
187    */
stop()188   public void stop() {
189     pooltg.stop();
190   }
191 
192   /**
193    * Add threads to this pool.
194    */
addThreads(int num, boolean start)195   void addThreads(int num, boolean start) {
196     synchronized (this) {
197       int numToAdd;
198       if (maxThreads < 0) {
199 	numToAdd = num;
200       } else {
201 	int numTotal = Math.min(maxThreads, numThreads()+num);
202 	numToAdd = numTotal - numThreads();
203       }
204       if ((maxThreads < 0) || (numToAdd < maxThreads)) {
205 	System.err.println("TP <"+poolname+">: Adding "+numToAdd+" threads to pool, size "+(numThreads()+numToAdd));
206       }
207       for (int i = 0; i < numToAdd; i++) {
208     	String name = "TP-"+numThreads()+" <"+poolname+">";
209 	Thread t = new Thread(pooltg, runnable, name);
210 	threads.addElement(t);
211 	mgr.getProfiler().getGraphProfiler().addThread(t, stage);
212 	if (start) t.start();
213       }
214     }
215   }
216 
217   /**
218    * Remove threads from pool.
219    */
removeThreads(int num)220   void removeThreads(int num) {
221     System.err.print("TP <"+poolname+">: Removing "+num+" threads from pool, ");
222     synchronized (this) {
223       for (int i = 0; (i < num) && (numThreads() > minThreads); i++) {
224 	Thread t = (Thread)threads.firstElement();
225 	stopThread(t);
226       }
227     }
228     System.err.println("size "+numThreads());
229   }
230 
231   /**
232    * Cause the given thread to stop execution.
233    */
stopThread(Thread t)234   void stopThread(Thread t) {
235     synchronized (this) {
236       threads.removeElement(t);
237       stoppedThreads.addElement(t);
238     }
239     System.err.println("TP <"+poolname+">: stopping thread, size "+numThreads());
240   }
241 
242   /**
243    * Return the number of threads in this pool.
244    */
numThreads()245   int numThreads() {
246     synchronized (this) {
247       return threads.size();
248     }
249   }
250 
251   /**
252    * Used by a thread to determine its queue block time.
253    */
getBlockTime()254   public long getBlockTime() {
255     return blockTime;
256   }
257 
258   /**
259    * Used by a thread to request its aggregation target from the pool.
260    */
getAggregationTarget()261   public synchronized int getAggregationTarget() {
262     if (aggThrottle != null) {
263       return aggThrottle.getAggTarget();
264     } else {
265       return maxAggregation;
266     }
267   }
268 
269   /**
270    * Used by a thread to determine whether it should exit.
271    */
timeToStop(long idleTime)272   public boolean timeToStop(long idleTime) {
273     synchronized (this) {
274       if ((idleTime > idleTimeThreshold) && (numThreads() > minThreads)) {
275 	stopThread(Thread.currentThread());
276       }
277       if (stoppedThreads.contains(Thread.currentThread())) return true;
278     }
279     return false;
280   }
281 
toString()282   public String toString() {
283     return "TP (size="+numThreads()+") for <"+poolname+">";
284   }
285 
getName()286   public String getName() {
287     return "ThreadPool <"+poolname+">";
288   }
289 
profileSize()290   public int profileSize() {
291     return numThreads();
292   }
293 
294 }
295 
296