1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import seda.sandStorm.core.*;
30 import seda.sandStorm.main.*;
31 import java.util.*;
32 
33 /**
34  * TPSThreadManager provides a threadpool-per-source-per-stage thread
35  * manager implementation.
36  *
37  * @author   Matt Welsh
38  */
39 
40 public class TPSThreadManager implements ThreadManagerIF, sandStormConst {
41 
42   private static final boolean DEBUG = false;
43   private static final boolean DEBUG_VERBOSE = false;
44 
45   protected ManagerIF mgr;
46   protected SandstormConfig config;
47   protected Hashtable srTbl;
48   protected ThreadPoolController sizeController;
49 
TPSThreadManager(ManagerIF mgr)50   public TPSThreadManager(ManagerIF mgr) {
51     this(mgr, true);
52   }
53 
TPSThreadManager(ManagerIF mgr, boolean initialize)54   public TPSThreadManager(ManagerIF mgr, boolean initialize) {
55     this.mgr = mgr;
56     this.config = mgr.getConfig();
57 
58     if (initialize) {
59       if (config.getBoolean("global.threadPool.sizeController.enable")) {
60 	sizeController = new ThreadPoolController(mgr);
61       }
62       srTbl = new Hashtable();
63     }
64   }
65 
66   /**
67    * Register a stage with this thread manager.
68    */
register(StageWrapperIF stage)69   public void register(StageWrapperIF stage) {
70     // Create a threadPool for the stage
71     stageRunnable sr = new stageRunnable(stage);
72     srTbl.put(sr, stage);
73   }
74 
75   /**
76    * Deregister a stage with this thread manager.
77    */
deregister(StageWrapperIF stage)78   public void deregister(StageWrapperIF stage) {
79     Enumeration e = srTbl.keys();
80     while (e.hasMoreElements()) {
81       stageRunnable sr = (stageRunnable)e.nextElement();
82       StageWrapperIF s = (StageWrapperIF)srTbl.get(sr);
83       if (s == stage) {
84 	sr.tp.stop();
85 	srTbl.remove(sr);
86       }
87     }
88   }
89 
90   /**
91    * Stop the thread manager and all threads managed by it.
92    */
deregisterAll()93   public void deregisterAll() {
94     Enumeration e = srTbl.keys();
95     while (e.hasMoreElements()) {
96       stageRunnable sr = (stageRunnable)e.nextElement();
97       StageWrapperIF s = (StageWrapperIF)srTbl.get(sr);
98       sr.tp.stop();
99       srTbl.remove(sr);
100     }
101   }
102 
103   /**
104    * Internal class representing the Runnable for a single stage.
105    */
106   public class stageRunnable implements Runnable {
107 
108     protected ThreadPool tp;
109     protected StageWrapperIF wrapper;
110     protected SourceIF source;
111     protected String name;
112     protected ResponseTimeControllerIF rtController = null;
113     protected boolean firstToken = false;
114     protected int aggTarget = -1;
115 
stageRunnable(StageWrapperIF wrapper, ThreadPool tp)116     protected stageRunnable(StageWrapperIF wrapper, ThreadPool tp) {
117       this.wrapper = wrapper;
118       this.tp = tp;
119       this.source = wrapper.getSource();
120       this.name = wrapper.getStage().getName();
121 
122       if (tp != null) {
123 	if (sizeController != null) {
124   	  // The sizeController is globally enabled -- has the user disabled
125   	  // it for this stage?
126   	  String val = config.getString("stages."+this.name+".threadPool.sizeController.enable");
127   	  if ((val == null) || val.equals("true") || val.equals("TRUE")) {
128   	    sizeController.register(wrapper, tp);
129   	  }
130    	}
131       }
132       this.rtController = wrapper.getResponseTimeController();
133 
134       if (tp != null) tp.start();
135     }
136 
stageRunnable(StageWrapperIF wrapper)137     protected stageRunnable(StageWrapperIF wrapper) {
138       this.wrapper = wrapper;
139       this.tp = tp;
140       this.source = wrapper.getSource();
141       this.name = wrapper.getStage().getName();
142 
143       // Create a threadPool for the stage
144       if (wrapper.getEventHandler() instanceof SingleThreadedEventHandlerIF) {
145 	tp = new ThreadPool(wrapper, mgr, this, 1);
146       } else {
147 	tp = new ThreadPool(wrapper, mgr, this);
148       }
149 
150       if (sizeController != null) {
151 	// The sizeController is globally enabled -- has the user disabled
152 	// it for this stage?
153 	String val = config.getString("stages."+this.name+".threadPool.sizeController.enable");
154 	if ((val == null) || val.equals("true") || val.equals("TRUE")) {
155 	  sizeController.register(wrapper, tp);
156 	}
157       }
158       this.rtController = wrapper.getResponseTimeController();
159 
160       tp.start();
161     }
162 
run()163     public void run() {
164       int blockTime;
165       long t1, t2;
166       long tstart = 0, tend = 0;
167       boolean isFirst = false;
168 
169       if (DEBUG) System.err.println(name+": starting, source is "+source);
170 
171       t1 = System.currentTimeMillis();
172 
173       while (true) {
174 
175 	synchronized (this) {
176   	  if (firstToken == false) {
177   	    firstToken = true; isFirst = true;
178   	  }
179    	}
180 
181        	try {
182 
183 	  blockTime = (int)tp.getBlockTime();
184 	  aggTarget = tp.getAggregationTarget();
185 
186 	  if (DEBUG_VERBOSE) System.err.println(name+": Doing blocking dequeue for "+wrapper);
187 
188 	  QueueElementIF fetched[];
189 	  if (aggTarget == -1) {
190 	    if (DEBUG_VERBOSE) System.err.println("TPSTM <"+this.name+"> dequeue (aggTarget -1)");
191   	    fetched = source.blocking_dequeue_all(blockTime);
192           } else {
193 	    if (DEBUG_VERBOSE) System.err.println("TPSTM <"+this.name+"> dequeue (aggTarget "+aggTarget+")");
194   	    fetched = source.blocking_dequeue(blockTime, aggTarget);
195 	  }
196 
197 	  if (fetched == null) {
198 	    t2 = System.currentTimeMillis();
199 	    if (tp.timeToStop(t2-t1)) {
200 	      if (DEBUG) System.err.println(name+": Exiting");
201 	      if (isFirst) {
202 		synchronized (this) { firstToken = false; }
203 	      }
204 	      return;
205 	    }
206 	    continue;
207 	  }
208 
209 	  t1 = System.currentTimeMillis();
210 
211 	  if (DEBUG_VERBOSE) System.err.println(name+": Got "+fetched.length+" elements for "+wrapper);
212 
213 	  /* Process events */
214 	  tstart = System.currentTimeMillis();
215 	  wrapper.getEventHandler().handleEvents(fetched);
216 	  tend = System.currentTimeMillis();
217 
218 	  /* Record service rate */
219 	  ((StageWrapper)wrapper).getStats().recordServiceRate(fetched.length, tend-tstart);
220 
221 	  /* Run response time controller controller */
222 	  if (rtController != null) {
223 	    if (rtController instanceof ResponseTimeControllerMM1) {
224 	      ((ResponseTimeControllerMM1)rtController).adjustThreshold(fetched, tstart, tend, isFirst, tp.numThreads());
225 	    } else {
226 	      rtController.adjustThreshold(fetched, tend-tstart);
227 	    }
228 	  }
229 
230 	  if (tp.timeToStop(0)) {
231 	    if (DEBUG) System.err.println(name+": Exiting");
232 	    if (isFirst) {
233       	      synchronized (this) { firstToken = false; }
234 	    }
235 	    return;
236 	  }
237 
238 	  Thread.currentThread().yield();
239 
240 	} catch (Exception e) {
241 	  System.err.println("TPSThreadManager: appThread ["+name+"] got exception "+e);
242 	  e.printStackTrace();
243 	}
244       }
245     }
246   }
247 
248 }
249 
250