1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import java.util.*; 30 31 /** 32 * Used as a proxy to observe and measure communication behavior between 33 * stages. By handing out a SinkProxy instead of a FiniteQueue, it is 34 * possible to gather statistics on event communication between stages. 35 * This is used by StageGraph to construct a graph of the communication 36 * patterns between stages. 37 * 38 * @author Matt Welsh 39 */ 40 public class SinkProxy implements SinkIF, ProfilableIF { 41 42 private static final boolean DEBUG = false; 43 44 private ManagerIF mgr; 45 private StageWrapperIF toStage; 46 private StageGraph stageGraph; 47 public SinkIF thesink; 48 private Thread client = null; 49 private Hashtable clientTbl = null; 50 51 /** 52 * Maintains a running sum of the number of elements enqueued onto 53 * this sink. 54 */ 55 public int enqueueCount; 56 57 /** 58 * Maintains a running sum of the number of elements successfully 59 * enqueued onto this sink (that is, not rejected by the enqueue predicate). 60 */ 61 public int enqueueSuccessCount; 62 63 /** 64 * Used to maintain a timer for statistics gathering. 65 */ 66 public long timer; 67 68 /** 69 * Create a SinkProxy for the given sink. 70 * 71 * @param sink The sink to create a proxy for. 72 * @param mgr The associated manager. 73 * @param toStage The stage which this sink pushes events to. 74 */ SinkProxy(SinkIF sink, ManagerIF mgr, StageWrapperIF toStage)75 public SinkProxy(SinkIF sink, ManagerIF mgr, StageWrapperIF toStage) { 76 this.thesink = sink; 77 this.mgr = mgr; 78 this.stageGraph = mgr.getProfiler().getGraphProfiler(); 79 this.toStage = toStage; 80 this.enqueueCount = 0; 81 this.enqueueSuccessCount = 0; 82 this.timer = 0; 83 } 84 85 /** 86 * Return the size of the queue. 87 */ size()88 public int size() { 89 if (thesink == null) return 0; 90 return thesink.size(); 91 } 92 enqueue(QueueElementIF enqueueMe)93 public void enqueue(QueueElementIF enqueueMe) throws SinkException { 94 recordUse(); 95 enqueueCount++; 96 thesink.enqueue(enqueueMe); 97 enqueueSuccessCount++; 98 } 99 enqueue_lossy(QueueElementIF enqueueMe)100 public boolean enqueue_lossy(QueueElementIF enqueueMe) { 101 recordUse(); 102 enqueueCount++; 103 boolean pass = thesink.enqueue_lossy(enqueueMe); 104 if (pass) enqueueSuccessCount++; 105 return pass; 106 } 107 enqueue_many(QueueElementIF[] enqueueMe)108 public void enqueue_many(QueueElementIF[] enqueueMe) throws SinkException { 109 recordUse(); 110 if (enqueueMe != null) { 111 enqueueCount += enqueueMe.length; 112 } 113 thesink.enqueue_many(enqueueMe); 114 if (enqueueMe != null) { 115 enqueueSuccessCount += enqueueMe.length; 116 } 117 } 118 119 /** 120 * Return the profile size of the queue. 121 */ profileSize()122 public int profileSize() { 123 return size(); 124 } 125 enqueue_prepare(QueueElementIF enqueueMe[])126 public Object enqueue_prepare(QueueElementIF enqueueMe[]) throws SinkException { 127 recordUse(); 128 if (enqueueMe != null) { 129 enqueueCount += enqueueMe.length; 130 } 131 Object key = thesink.enqueue_prepare(enqueueMe); 132 if (enqueueMe != null) { 133 enqueueSuccessCount += enqueueMe.length; 134 } 135 return key; 136 } 137 enqueue_commit(Object key)138 public void enqueue_commit(Object key) { 139 thesink.enqueue_commit(key); 140 } 141 enqueue_abort(Object key)142 public void enqueue_abort(Object key) { 143 thesink.enqueue_abort(key); 144 } 145 setEnqueuePredicate(EnqueuePredicateIF pred)146 public void setEnqueuePredicate(EnqueuePredicateIF pred) { 147 thesink.setEnqueuePredicate(pred); 148 } 149 getEnqueuePredicate()150 public EnqueuePredicateIF getEnqueuePredicate() { 151 return thesink.getEnqueuePredicate(); 152 } 153 toString()154 public String toString() { 155 return "[SinkProxy for toStage="+toStage+"]"; 156 } 157 recordUse()158 private void recordUse() { 159 if (DEBUG) System.err.println("SinkProxy: Recording use of "+this+" by thread "+Thread.currentThread()); 160 161 if (client == null) { 162 client = Thread.currentThread(); 163 164 StageGraphEdge edge = new StageGraphEdge(); 165 edge.fromStage = stageGraph.getStageFromThread(client); 166 edge.toStage = toStage; 167 edge.sink = this; 168 stageGraph.addEdge(edge); 169 170 } else { 171 Thread t = Thread.currentThread(); 172 if (client != t) { 173 if (clientTbl == null) clientTbl = new Hashtable(); 174 if (clientTbl.get(t) == null) { 175 clientTbl.put(t, t); 176 177 StageGraphEdge edge = new StageGraphEdge(); 178 edge.fromStage = stageGraph.getStageFromThread(t); 179 edge.toStage = toStage; 180 edge.sink = this; 181 stageGraph.addEdge(edge); 182 } 183 } 184 } 185 } 186 187 } 188