1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import seda.sandStorm.core.*;
30 import seda.sandStorm.main.*;
31 import java.util.*;
32 import seda.util.*;
33 
34 /**
35  * AggThrottle is used by thread managers to adjust their aggregation
36  * level based on observations of stage throughput.
37  *
38  * @author Matt Welsh
39  */
40 class AggThrottle {
41 
42   private static final boolean DEBUG = false;
43 
44   private StageWrapperIF stage;
45   private String name;
46   private ManagerIF mgr;
47 
48   private double bestThroughput, lastThroughput;
49   private int bestTarget;
50   private long lastEvents;
51   private long lastMeasurementTime;
52   private int measurementCount, adjustCount;
53 
54   private static final int STATE_DECREASING = 0;
55   private static final int STATE_INCREASING = 1;
56   private int state = STATE_DECREASING;
57   private int increase_count = 0;
58 
59   private static final int ADJUST_DELAY = 5;
60 
61   private int minAggregation = 8;
62 //  private int maxAggregation = -1;
63   private int maxAggregation = 1000;
64   private int recalcWindow = 1000;
65   private double smoothConst = 0.7;
66 
67   private static final double REDUCE_FACTOR = 1.2;
68   private static final double INCREASE_FACTOR = 1.2;
69   private static final double LOW_WATER = 0.90;
70   private static final double HIGH_WATER = 0.98;
71   private static final double VERY_LOW_WATER = 0.2;
72   private static final double VERY_HIGH_WATER = 2.0;
73 
74   private int aggregationTarget;
75   private Random rand = new Random();
76 
AggThrottle(StageWrapperIF stage, ManagerIF mgr)77   AggThrottle(StageWrapperIF stage, ManagerIF mgr) {
78     this.stage = stage;
79     this.name = stage.getStage().getName();
80     this.mgr = mgr;
81     SandstormConfig config = mgr.getConfig();
82 
83     this.minAggregation = config.getInt("global.batchController.minBatch",
84 	minAggregation);
85     this.maxAggregation = config.getInt("global.batchController.maxBatch",
86 	maxAggregation);
87 //    this.recalcWindow = config.getInt("global.batchController.recalcWindow",
88 //	recalcWindow);
89     this.smoothConst = config.getDouble("global.batchController.smoothConst",
90 	smoothConst);
91 
92     System.err.println("AggThrottle <"+name+"> created: minBatch "+minAggregation+", maxBatch "+maxAggregation+", recalcWindow "+recalcWindow);
93     this.aggregationTarget = this.maxAggregation;
94 
95     lastThroughput = 0.0;
96     bestThroughput = 0.0;
97     bestTarget = aggregationTarget;
98     lastEvents = 0;
99     lastMeasurementTime = System.currentTimeMillis();
100     measurementCount = adjustCount = 0;
101 
102     mgr.getProfiler().add("AggThrottle throughput for <"+name+">",
103 	new ProfilableIF() {
104 	public int profileSize() {
105 	//int foo = getAggTarget(); // Recalculate
106 	return (int)lastThroughput;
107 	}
108 	});
109     mgr.getProfiler().add("AggThrottle bestThroughput for <"+name+">",
110 	new ProfilableIF() {
111 	public int profileSize() {
112 	//int foo = getAggTarget(); // Recalculate
113 	return (int)bestThroughput;
114 	}
115 	});
116     mgr.getProfiler().add("AggThrottle aggTarget for <"+name+">",
117 	new ProfilableIF() {
118 	public int profileSize() {
119 	//int foo = getAggTarget(); // Recalculate
120 	return aggregationTarget;
121 	}
122 	});
123   }
124 
toString()125   public String toString() {
126     return "AggThrottle <"+name+">";
127   }
128 
getAggTarget()129   synchronized int getAggTarget() {
130 
131     long cur_time = System.currentTimeMillis();
132     long time_elapsed = cur_time - lastMeasurementTime;
133 
134     if (time_elapsed < recalcWindow) {
135       return aggregationTarget;
136     }
137 
138 //    measurementCount++;
139 //    if ((measurementCount % recalcWindow) != 0) {
140 //      return aggregationTarget;
141 //    }
142 
143     long events = stage.getStats().getTotalEvents();
144     long curEvents = events - lastEvents;
145     lastEvents = events;
146 
147     lastMeasurementTime = cur_time;
148 
149     double throughput = (curEvents * 1.0) / ((double)time_elapsed * 1.0e-3);
150     double avgThroughput = (smoothConst * lastThroughput) + ((1.0 - smoothConst) * throughput);
151 
152     adjustCount++;
153     if ((adjustCount % ADJUST_DELAY) == 0) {
154 
155       if (avgThroughput < (VERY_LOW_WATER*bestThroughput)) {
156    	aggregationTarget = maxAggregation;
157     	state = STATE_DECREASING;
158       }
159 
160       if (avgThroughput >= (VERY_HIGH_WATER*bestThroughput)) {
161 	aggregationTarget = maxAggregation;
162 	state = STATE_DECREASING;
163       }
164 
165       if (state == STATE_DECREASING) {
166 	if (avgThroughput <= (LOW_WATER*bestThroughput)) {
167 	  // Fell below low water - increase
168 	  //bestThroughput = avgThroughput;
169 	  state = STATE_INCREASING;
170 	  aggregationTarget *= INCREASE_FACTOR;
171 	  if (aggregationTarget > maxAggregation) aggregationTarget = maxAggregation;
172 	} else if (avgThroughput > bestThroughput) {
173 	  // Better throughput - save and decrease
174 	  bestThroughput = avgThroughput;
175 	  aggregationTarget /= REDUCE_FACTOR;
176 	  if (aggregationTarget < minAggregation) aggregationTarget = minAggregation;
177 	} else {
178 	  // Just decrease
179 	  aggregationTarget /= REDUCE_FACTOR;
180 	  if (aggregationTarget < minAggregation) aggregationTarget = minAggregation;
181 	}
182 
183       } else if (state == STATE_INCREASING) {
184 	if (avgThroughput > bestThroughput) {
185 	  // Better throughput - save
186 	  bestThroughput = avgThroughput;
187 	}
188 	if (avgThroughput >= (HIGH_WATER*bestThroughput)) {
189 	  // Start decreasing
190 	  state = STATE_DECREASING;
191 	  aggregationTarget /= REDUCE_FACTOR;
192 	  if (aggregationTarget < minAggregation) aggregationTarget = minAggregation;
193 //	} else if (avgThroughput <= (LOW_WATER*bestThroughput)) {
194 	  // Fell below low water - decrease
195 	  //bestThroughput = avgThroughput;
196 //	  state = STATE_DECREASING;
197 //	  aggregationTarget /= REDUCE_FACTOR;
198 //	  if (aggregationTarget < minAggregation) aggregationTarget = minAggregation;
199 	} else {
200 	  // Just increase
201 	  aggregationTarget *= INCREASE_FACTOR;
202 	  if (aggregationTarget > maxAggregation) {
203 	    // Maxed out, so save best throughput and start decreasing
204 	    aggregationTarget = maxAggregation;
205 	    state = STATE_DECREASING;
206 	    bestThroughput = avgThroughput;
207 	  }
208 	}
209       }
210 
211       // Randomly reset best estimate if not below LOW_WATER
212 //      if (rand.nextDouble() <= 0.2) {
213 //	if (avgThroughput >= (LOW_WATER*bestThroughput)) {
214 //	  bestThroughput = avgThroughput;
215 //	}
216  //     }
217 
218       // Randomly switch direction
219       if (rand.nextDouble() <= 0.0) {
220 	if (state == STATE_INCREASING) {
221 	  state = STATE_DECREASING;
222 	  aggregationTarget /= REDUCE_FACTOR;
223 	  if (aggregationTarget < minAggregation) aggregationTarget = minAggregation;
224 	} else {
225 	  state = STATE_INCREASING;
226 	  aggregationTarget *= INCREASE_FACTOR;
227 	  if (aggregationTarget > maxAggregation) aggregationTarget = maxAggregation;
228 	}
229       }
230 
231       // Randomly reset
232       if (rand.nextDouble() <= 0.00) {
233 	state = STATE_DECREASING;
234    	aggregationTarget = maxAggregation;
235 	bestThroughput = 0.0;
236       }
237     }
238 
239     if (DEBUG) System.err.println("AggThrottle <"+name+">: avgThroughput "+MDWUtil.format(avgThroughput)+", last "+MDWUtil.format(lastThroughput)+", state "+((state==0)?"dec":"inc")+", aggTarget "+aggregationTarget);
240 
241     //if ((adjustCount % ADJUST_DELAY) == 0) lastThroughput = avgThroughput;
242     lastThroughput = avgThroughput;
243     return aggregationTarget;
244   }
245 
246 }
247 
248