1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import seda.sandStorm.core.*; 30 import seda.sandStorm.main.*; 31 import java.util.*; 32 import seda.util.*; 33 34 /** 35 * AggThrottle is used by thread managers to adjust their aggregation 36 * level based on observations of stage throughput. 37 * 38 * @author Matt Welsh 39 */ 40 class AggThrottle { 41 42 private static final boolean DEBUG = false; 43 44 private StageWrapperIF stage; 45 private String name; 46 private ManagerIF mgr; 47 48 private double bestThroughput, lastThroughput; 49 private int bestTarget; 50 private long lastEvents; 51 private long lastMeasurementTime; 52 private int measurementCount, adjustCount; 53 54 private static final int STATE_DECREASING = 0; 55 private static final int STATE_INCREASING = 1; 56 private int state = STATE_DECREASING; 57 private int increase_count = 0; 58 59 private static final int ADJUST_DELAY = 5; 60 61 private int minAggregation = 8; 62 // private int maxAggregation = -1; 63 private int maxAggregation = 1000; 64 private int recalcWindow = 1000; 65 private double smoothConst = 0.7; 66 67 private static final double REDUCE_FACTOR = 1.2; 68 private static final double INCREASE_FACTOR = 1.2; 69 private static final double LOW_WATER = 0.90; 70 private static final double HIGH_WATER = 0.98; 71 private static final double VERY_LOW_WATER = 0.2; 72 private static final double VERY_HIGH_WATER = 2.0; 73 74 private int aggregationTarget; 75 private Random rand = new Random(); 76 AggThrottle(StageWrapperIF stage, ManagerIF mgr)77 AggThrottle(StageWrapperIF stage, ManagerIF mgr) { 78 this.stage = stage; 79 this.name = stage.getStage().getName(); 80 this.mgr = mgr; 81 SandstormConfig config = mgr.getConfig(); 82 83 this.minAggregation = config.getInt("global.batchController.minBatch", 84 minAggregation); 85 this.maxAggregation = config.getInt("global.batchController.maxBatch", 86 maxAggregation); 87 // this.recalcWindow = config.getInt("global.batchController.recalcWindow", 88 // recalcWindow); 89 this.smoothConst = config.getDouble("global.batchController.smoothConst", 90 smoothConst); 91 92 System.err.println("AggThrottle <"+name+"> created: minBatch "+minAggregation+", maxBatch "+maxAggregation+", recalcWindow "+recalcWindow); 93 this.aggregationTarget = this.maxAggregation; 94 95 lastThroughput = 0.0; 96 bestThroughput = 0.0; 97 bestTarget = aggregationTarget; 98 lastEvents = 0; 99 lastMeasurementTime = System.currentTimeMillis(); 100 measurementCount = adjustCount = 0; 101 102 mgr.getProfiler().add("AggThrottle throughput for <"+name+">", 103 new ProfilableIF() { 104 public int profileSize() { 105 //int foo = getAggTarget(); // Recalculate 106 return (int)lastThroughput; 107 } 108 }); 109 mgr.getProfiler().add("AggThrottle bestThroughput for <"+name+">", 110 new ProfilableIF() { 111 public int profileSize() { 112 //int foo = getAggTarget(); // Recalculate 113 return (int)bestThroughput; 114 } 115 }); 116 mgr.getProfiler().add("AggThrottle aggTarget for <"+name+">", 117 new ProfilableIF() { 118 public int profileSize() { 119 //int foo = getAggTarget(); // Recalculate 120 return aggregationTarget; 121 } 122 }); 123 } 124 toString()125 public String toString() { 126 return "AggThrottle <"+name+">"; 127 } 128 getAggTarget()129 synchronized int getAggTarget() { 130 131 long cur_time = System.currentTimeMillis(); 132 long time_elapsed = cur_time - lastMeasurementTime; 133 134 if (time_elapsed < recalcWindow) { 135 return aggregationTarget; 136 } 137 138 // measurementCount++; 139 // if ((measurementCount % recalcWindow) != 0) { 140 // return aggregationTarget; 141 // } 142 143 long events = stage.getStats().getTotalEvents(); 144 long curEvents = events - lastEvents; 145 lastEvents = events; 146 147 lastMeasurementTime = cur_time; 148 149 double throughput = (curEvents * 1.0) / ((double)time_elapsed * 1.0e-3); 150 double avgThroughput = (smoothConst * lastThroughput) + ((1.0 - smoothConst) * throughput); 151 152 adjustCount++; 153 if ((adjustCount % ADJUST_DELAY) == 0) { 154 155 if (avgThroughput < (VERY_LOW_WATER*bestThroughput)) { 156 aggregationTarget = maxAggregation; 157 state = STATE_DECREASING; 158 } 159 160 if (avgThroughput >= (VERY_HIGH_WATER*bestThroughput)) { 161 aggregationTarget = maxAggregation; 162 state = STATE_DECREASING; 163 } 164 165 if (state == STATE_DECREASING) { 166 if (avgThroughput <= (LOW_WATER*bestThroughput)) { 167 // Fell below low water - increase 168 //bestThroughput = avgThroughput; 169 state = STATE_INCREASING; 170 aggregationTarget *= INCREASE_FACTOR; 171 if (aggregationTarget > maxAggregation) aggregationTarget = maxAggregation; 172 } else if (avgThroughput > bestThroughput) { 173 // Better throughput - save and decrease 174 bestThroughput = avgThroughput; 175 aggregationTarget /= REDUCE_FACTOR; 176 if (aggregationTarget < minAggregation) aggregationTarget = minAggregation; 177 } else { 178 // Just decrease 179 aggregationTarget /= REDUCE_FACTOR; 180 if (aggregationTarget < minAggregation) aggregationTarget = minAggregation; 181 } 182 183 } else if (state == STATE_INCREASING) { 184 if (avgThroughput > bestThroughput) { 185 // Better throughput - save 186 bestThroughput = avgThroughput; 187 } 188 if (avgThroughput >= (HIGH_WATER*bestThroughput)) { 189 // Start decreasing 190 state = STATE_DECREASING; 191 aggregationTarget /= REDUCE_FACTOR; 192 if (aggregationTarget < minAggregation) aggregationTarget = minAggregation; 193 // } else if (avgThroughput <= (LOW_WATER*bestThroughput)) { 194 // Fell below low water - decrease 195 //bestThroughput = avgThroughput; 196 // state = STATE_DECREASING; 197 // aggregationTarget /= REDUCE_FACTOR; 198 // if (aggregationTarget < minAggregation) aggregationTarget = minAggregation; 199 } else { 200 // Just increase 201 aggregationTarget *= INCREASE_FACTOR; 202 if (aggregationTarget > maxAggregation) { 203 // Maxed out, so save best throughput and start decreasing 204 aggregationTarget = maxAggregation; 205 state = STATE_DECREASING; 206 bestThroughput = avgThroughput; 207 } 208 } 209 } 210 211 // Randomly reset best estimate if not below LOW_WATER 212 // if (rand.nextDouble() <= 0.2) { 213 // if (avgThroughput >= (LOW_WATER*bestThroughput)) { 214 // bestThroughput = avgThroughput; 215 // } 216 // } 217 218 // Randomly switch direction 219 if (rand.nextDouble() <= 0.0) { 220 if (state == STATE_INCREASING) { 221 state = STATE_DECREASING; 222 aggregationTarget /= REDUCE_FACTOR; 223 if (aggregationTarget < minAggregation) aggregationTarget = minAggregation; 224 } else { 225 state = STATE_INCREASING; 226 aggregationTarget *= INCREASE_FACTOR; 227 if (aggregationTarget > maxAggregation) aggregationTarget = maxAggregation; 228 } 229 } 230 231 // Randomly reset 232 if (rand.nextDouble() <= 0.00) { 233 state = STATE_DECREASING; 234 aggregationTarget = maxAggregation; 235 bestThroughput = 0.0; 236 } 237 } 238 239 if (DEBUG) System.err.println("AggThrottle <"+name+">: avgThroughput "+MDWUtil.format(avgThroughput)+", last "+MDWUtil.format(lastThroughput)+", state "+((state==0)?"dec":"inc")+", aggTarget "+aggregationTarget); 240 241 //if ((adjustCount % ADJUST_DELAY) == 0) lastThroughput = avgThroughput; 242 lastThroughput = avgThroughput; 243 return aggregationTarget; 244 } 245 246 } 247 248