1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import java.util.*;
30 
31 /**
32  * Used as a proxy to observe and measure communication behavior between
33  * stages. By handing out a SinkProxy instead of a FiniteQueue, it is
34  * possible to gather statistics on event communication between stages.
35  * This is used by StageGraph to construct a graph of the communication
36  * patterns between stages.
37  *
38  * @author Matt Welsh
39  */
40 public class SinkProxy implements SinkIF, ProfilableIF {
41 
42   private static final boolean DEBUG = false;
43 
44   private ManagerIF mgr;
45   private StageWrapperIF toStage;
46   private StageGraph stageGraph;
47   public SinkIF thesink;
48   private Thread client = null;
49   private Hashtable clientTbl = null;
50 
51   /**
52    * Maintains a running sum of the number of elements enqueued onto
53    * this sink.
54    */
55   public int enqueueCount;
56 
57   /**
58    * Maintains a running sum of the number of elements successfully
59    * enqueued onto this sink (that is, not rejected by the enqueue predicate).
60    */
61   public int enqueueSuccessCount;
62 
63   /**
64    * Used to maintain a timer for statistics gathering.
65    */
66   public long timer;
67 
68   /**
69    * Create a SinkProxy for the given sink.
70    *
71    * @param sink The sink to create a proxy for.
72    * @param mgr The associated manager.
73    * @param toStage The stage which this sink pushes events to.
74    */
SinkProxy(SinkIF sink, ManagerIF mgr, StageWrapperIF toStage)75   public SinkProxy(SinkIF sink, ManagerIF mgr, StageWrapperIF toStage) {
76     this.thesink = sink;
77     this.mgr = mgr;
78     this.stageGraph = mgr.getProfiler().getGraphProfiler();
79     this.toStage = toStage;
80     this.enqueueCount = 0;
81     this.enqueueSuccessCount = 0;
82     this.timer = 0;
83   }
84 
85   /**
86    * Return the size of the queue.
87    */
size()88   public int size() {
89     if (thesink == null) return 0;
90     return thesink.size();
91   }
92 
enqueue(QueueElementIF enqueueMe)93   public void enqueue(QueueElementIF enqueueMe) throws SinkException {
94     recordUse();
95     enqueueCount++;
96     thesink.enqueue(enqueueMe);
97     enqueueSuccessCount++;
98   }
99 
enqueue_lossy(QueueElementIF enqueueMe)100   public boolean enqueue_lossy(QueueElementIF enqueueMe) {
101     recordUse();
102     enqueueCount++;
103     boolean pass = thesink.enqueue_lossy(enqueueMe);
104     if (pass) enqueueSuccessCount++;
105     return pass;
106   }
107 
enqueue_many(QueueElementIF[] enqueueMe)108   public void enqueue_many(QueueElementIF[] enqueueMe) throws SinkException {
109     recordUse();
110     if (enqueueMe != null) {
111       enqueueCount += enqueueMe.length;
112     }
113     thesink.enqueue_many(enqueueMe);
114     if (enqueueMe != null) {
115       enqueueSuccessCount += enqueueMe.length;
116     }
117   }
118 
119   /**
120    * Return the profile size of the queue.
121    */
profileSize()122   public int profileSize() {
123     return size();
124   }
125 
enqueue_prepare(QueueElementIF enqueueMe[])126   public Object enqueue_prepare(QueueElementIF enqueueMe[]) throws SinkException {
127     recordUse();
128     if (enqueueMe != null) {
129       enqueueCount += enqueueMe.length;
130     }
131     Object key = thesink.enqueue_prepare(enqueueMe);
132     if (enqueueMe != null) {
133       enqueueSuccessCount += enqueueMe.length;
134     }
135     return key;
136   }
137 
enqueue_commit(Object key)138   public void enqueue_commit(Object key) {
139     thesink.enqueue_commit(key);
140   }
141 
enqueue_abort(Object key)142   public void enqueue_abort(Object key) {
143     thesink.enqueue_abort(key);
144   }
145 
setEnqueuePredicate(EnqueuePredicateIF pred)146   public void setEnqueuePredicate(EnqueuePredicateIF pred) {
147     thesink.setEnqueuePredicate(pred);
148   }
149 
getEnqueuePredicate()150   public EnqueuePredicateIF getEnqueuePredicate() {
151     return thesink.getEnqueuePredicate();
152   }
153 
toString()154   public String toString() {
155     return "[SinkProxy for toStage="+toStage+"]";
156   }
157 
recordUse()158   private void recordUse() {
159     if (DEBUG) System.err.println("SinkProxy: Recording use of "+this+" by thread "+Thread.currentThread());
160 
161     if (client == null) {
162       client = Thread.currentThread();
163 
164       StageGraphEdge edge = new StageGraphEdge();
165       edge.fromStage = stageGraph.getStageFromThread(client);
166       edge.toStage = toStage;
167       edge.sink = this;
168       stageGraph.addEdge(edge);
169 
170     } else {
171       Thread t = Thread.currentThread();
172       if (client != t) {
173 	if (clientTbl == null) clientTbl = new Hashtable();
174 	if (clientTbl.get(t) == null) {
175 	  clientTbl.put(t, t);
176 
177 	  StageGraphEdge edge = new StageGraphEdge();
178 	  edge.fromStage = stageGraph.getStageFromThread(t);
179 	  edge.toStage = toStage;
180 	  edge.sink = this;
181 	  stageGraph.addEdge(edge);
182 	}
183       }
184     }
185   }
186 
187 }
188