1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import seda.sandStorm.core.*; 30 import seda.sandStorm.main.*; 31 import java.util.*; 32 33 /** 34 * TPSThreadManager provides a threadpool-per-source-per-stage thread 35 * manager implementation. 36 * 37 * @author Matt Welsh 38 */ 39 40 public class TPSThreadManager implements ThreadManagerIF, sandStormConst { 41 42 private static final boolean DEBUG = false; 43 private static final boolean DEBUG_VERBOSE = false; 44 45 protected ManagerIF mgr; 46 protected SandstormConfig config; 47 protected Hashtable srTbl; 48 protected ThreadPoolController sizeController; 49 TPSThreadManager(ManagerIF mgr)50 public TPSThreadManager(ManagerIF mgr) { 51 this(mgr, true); 52 } 53 TPSThreadManager(ManagerIF mgr, boolean initialize)54 public TPSThreadManager(ManagerIF mgr, boolean initialize) { 55 this.mgr = mgr; 56 this.config = mgr.getConfig(); 57 58 if (initialize) { 59 if (config.getBoolean("global.threadPool.sizeController.enable")) { 60 sizeController = new ThreadPoolController(mgr); 61 } 62 srTbl = new Hashtable(); 63 } 64 } 65 66 /** 67 * Register a stage with this thread manager. 68 */ register(StageWrapperIF stage)69 public void register(StageWrapperIF stage) { 70 // Create a threadPool for the stage 71 stageRunnable sr = new stageRunnable(stage); 72 srTbl.put(sr, stage); 73 } 74 75 /** 76 * Deregister a stage with this thread manager. 77 */ deregister(StageWrapperIF stage)78 public void deregister(StageWrapperIF stage) { 79 Enumeration e = srTbl.keys(); 80 while (e.hasMoreElements()) { 81 stageRunnable sr = (stageRunnable)e.nextElement(); 82 StageWrapperIF s = (StageWrapperIF)srTbl.get(sr); 83 if (s == stage) { 84 sr.tp.stop(); 85 srTbl.remove(sr); 86 } 87 } 88 } 89 90 /** 91 * Stop the thread manager and all threads managed by it. 92 */ deregisterAll()93 public void deregisterAll() { 94 Enumeration e = srTbl.keys(); 95 while (e.hasMoreElements()) { 96 stageRunnable sr = (stageRunnable)e.nextElement(); 97 StageWrapperIF s = (StageWrapperIF)srTbl.get(sr); 98 sr.tp.stop(); 99 srTbl.remove(sr); 100 } 101 } 102 103 /** 104 * Internal class representing the Runnable for a single stage. 105 */ 106 public class stageRunnable implements Runnable { 107 108 protected ThreadPool tp; 109 protected StageWrapperIF wrapper; 110 protected SourceIF source; 111 protected String name; 112 protected ResponseTimeControllerIF rtController = null; 113 protected boolean firstToken = false; 114 protected int aggTarget = -1; 115 stageRunnable(StageWrapperIF wrapper, ThreadPool tp)116 protected stageRunnable(StageWrapperIF wrapper, ThreadPool tp) { 117 this.wrapper = wrapper; 118 this.tp = tp; 119 this.source = wrapper.getSource(); 120 this.name = wrapper.getStage().getName(); 121 122 if (tp != null) { 123 if (sizeController != null) { 124 // The sizeController is globally enabled -- has the user disabled 125 // it for this stage? 126 String val = config.getString("stages."+this.name+".threadPool.sizeController.enable"); 127 if ((val == null) || val.equals("true") || val.equals("TRUE")) { 128 sizeController.register(wrapper, tp); 129 } 130 } 131 } 132 this.rtController = wrapper.getResponseTimeController(); 133 134 if (tp != null) tp.start(); 135 } 136 stageRunnable(StageWrapperIF wrapper)137 protected stageRunnable(StageWrapperIF wrapper) { 138 this.wrapper = wrapper; 139 this.tp = tp; 140 this.source = wrapper.getSource(); 141 this.name = wrapper.getStage().getName(); 142 143 // Create a threadPool for the stage 144 if (wrapper.getEventHandler() instanceof SingleThreadedEventHandlerIF) { 145 tp = new ThreadPool(wrapper, mgr, this, 1); 146 } else { 147 tp = new ThreadPool(wrapper, mgr, this); 148 } 149 150 if (sizeController != null) { 151 // The sizeController is globally enabled -- has the user disabled 152 // it for this stage? 153 String val = config.getString("stages."+this.name+".threadPool.sizeController.enable"); 154 if ((val == null) || val.equals("true") || val.equals("TRUE")) { 155 sizeController.register(wrapper, tp); 156 } 157 } 158 this.rtController = wrapper.getResponseTimeController(); 159 160 tp.start(); 161 } 162 run()163 public void run() { 164 int blockTime; 165 long t1, t2; 166 long tstart = 0, tend = 0; 167 boolean isFirst = false; 168 169 if (DEBUG) System.err.println(name+": starting, source is "+source); 170 171 t1 = System.currentTimeMillis(); 172 173 while (true) { 174 175 synchronized (this) { 176 if (firstToken == false) { 177 firstToken = true; isFirst = true; 178 } 179 } 180 181 try { 182 183 blockTime = (int)tp.getBlockTime(); 184 aggTarget = tp.getAggregationTarget(); 185 186 if (DEBUG_VERBOSE) System.err.println(name+": Doing blocking dequeue for "+wrapper); 187 188 QueueElementIF fetched[]; 189 if (aggTarget == -1) { 190 if (DEBUG_VERBOSE) System.err.println("TPSTM <"+this.name+"> dequeue (aggTarget -1)"); 191 fetched = source.blocking_dequeue_all(blockTime); 192 } else { 193 if (DEBUG_VERBOSE) System.err.println("TPSTM <"+this.name+"> dequeue (aggTarget "+aggTarget+")"); 194 fetched = source.blocking_dequeue(blockTime, aggTarget); 195 } 196 197 if (fetched == null) { 198 t2 = System.currentTimeMillis(); 199 if (tp.timeToStop(t2-t1)) { 200 if (DEBUG) System.err.println(name+": Exiting"); 201 if (isFirst) { 202 synchronized (this) { firstToken = false; } 203 } 204 return; 205 } 206 continue; 207 } 208 209 t1 = System.currentTimeMillis(); 210 211 if (DEBUG_VERBOSE) System.err.println(name+": Got "+fetched.length+" elements for "+wrapper); 212 213 /* Process events */ 214 tstart = System.currentTimeMillis(); 215 wrapper.getEventHandler().handleEvents(fetched); 216 tend = System.currentTimeMillis(); 217 218 /* Record service rate */ 219 ((StageWrapper)wrapper).getStats().recordServiceRate(fetched.length, tend-tstart); 220 221 /* Run response time controller controller */ 222 if (rtController != null) { 223 if (rtController instanceof ResponseTimeControllerMM1) { 224 ((ResponseTimeControllerMM1)rtController).adjustThreshold(fetched, tstart, tend, isFirst, tp.numThreads()); 225 } else { 226 rtController.adjustThreshold(fetched, tend-tstart); 227 } 228 } 229 230 if (tp.timeToStop(0)) { 231 if (DEBUG) System.err.println(name+": Exiting"); 232 if (isFirst) { 233 synchronized (this) { firstToken = false; } 234 } 235 return; 236 } 237 238 Thread.currentThread().yield(); 239 240 } catch (Exception e) { 241 System.err.println("TPSThreadManager: appThread ["+name+"] got exception "+e); 242 e.printStackTrace(); 243 } 244 } 245 } 246 } 247 248 } 249 250