1 // ========================================================================
2 // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3 // ------------------------------------------------------------------------
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 //========================================================================
14 
15 package org.mortbay.cometd;
16 
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Queue;
21 
22 import org.cometd.Bayeux;
23 import org.cometd.Client;
24 import org.cometd.DeliverListener;
25 import org.cometd.Extension;
26 import org.cometd.ClientListener;
27 import org.cometd.Message;
28 import org.cometd.MessageListener;
29 import org.cometd.QueueListener;
30 import org.cometd.RemoveListener;
31 import org.mortbay.util.ArrayQueue;
32 import org.mortbay.util.LazyList;
33 import org.mortbay.util.ajax.JSON;
34 
35 
36 
37 /* ------------------------------------------------------------ */
38 /**
39  *
40  * @author gregw
41  */
42 public class ClientImpl implements Client
43 {
44     private String _id;
45     private String _type;
46     private int _responsesPending;
47     private ChannelImpl[] _subscriptions=new ChannelImpl[0]; // copy on write
48     private boolean _JSONCommented;
49     private RemoveListener[] _rListeners; // copy on write
50     private MessageListener[] _syncMListeners; // copy on write
51     private MessageListener[] _asyncMListeners; // copy on write
52     private QueueListener[] _qListeners; // copy on write
53     private DeliverListener[] _dListeners; // copy on write
54     protected AbstractBayeux _bayeux;
55     private String _browserId;
56     private JSON.Literal _advice;
57     private int _batch;
58     private int _maxQueue;
59     private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
60     private long _timeout;
61 
62     // manipulated and synchronized by AbstractBayeux
63     int _adviseVersion;
64 
65     /* ------------------------------------------------------------ */
ClientImpl(AbstractBayeux bayeux)66     protected ClientImpl(AbstractBayeux bayeux)
67     {
68         _bayeux=bayeux;
69         _maxQueue=bayeux.getMaxClientQueue();
70         _bayeux.addClient(this,null);
71         if (_bayeux.isLogInfo())
72             _bayeux.logInfo("newClient: "+this);
73     }
74 
75     /* ------------------------------------------------------------ */
ClientImpl(AbstractBayeux bayeux, String idPrefix)76     protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
77     {
78         _bayeux=bayeux;
79         _maxQueue=0;
80 
81         _bayeux.addClient(this,idPrefix);
82 
83         if (_bayeux.isLogInfo())
84             _bayeux.logInfo("newClient: "+this);
85 
86     }
87 
88     /* ------------------------------------------------------------ */
deliver(Client from, String toChannel, Object data, String id)89     public void deliver(Client from, String toChannel, Object data, String id)
90     {
91         // TODO recycle maps
92         Message message=_bayeux.newMessage();
93         message.put(Bayeux.CHANNEL_FIELD,toChannel);
94         message.put(Bayeux.DATA_FIELD,data);
95         if (id!=null)
96             message.put(Bayeux.ID_FIELD,id);
97 
98         for (Extension e:_bayeux._extensions)
99             message=e.send(message);
100         doDelivery(from,message);
101 
102         ((MessageImpl)message).decRef();
103     }
104 
105     /* ------------------------------------------------------------ */
doDelivery(Client from, Message message)106     protected void doDelivery(Client from, Message message)
107     {
108         MessageListener[] alisteners=null;
109         synchronized(this)
110         {
111             ((MessageImpl)message).incRef();
112 
113             if (_maxQueue<0)
114             {
115                 _queue.addUnsafe(message);
116             }
117             else
118             {
119                 boolean add=_maxQueue>0;
120                 if (_queue.size()>=_maxQueue && _qListeners!=null)
121                 {
122                     for (QueueListener l : _qListeners)
123                     {
124                         add &= l.queueMaxed((Client)this,message);
125                     }
126                 }
127 
128                 if (add)
129                     _queue.addUnsafe(message);
130             }
131 
132             // deliver unsynchronized
133             if (_syncMListeners!=null)
134                 for (MessageListener l:_syncMListeners)
135                     l.deliver(from,this,message);
136             alisteners=_asyncMListeners;
137 
138             if (_batch==0 &&  _responsesPending<1 && _queue.size()>0)
139                 resume();
140         }
141 
142         // deliver unsynchronized
143         if (alisteners!=null)
144             for (MessageListener l:alisteners)
145                 l.deliver(from,this,message);
146     }
147 
148     /* ------------------------------------------------------------ */
doDeliverListeners()149     public void doDeliverListeners()
150     {
151         synchronized (this)
152         {
153             if (_dListeners!=null)
154                 for (DeliverListener l:_dListeners)
155                     l.deliver(this,_queue);
156         }
157     }
158 
159 
160     /* ------------------------------------------------------------ */
startBatch()161     public void startBatch()
162     {
163         synchronized(this)
164         {
165             _batch++;
166         }
167     }
168 
169     /* ------------------------------------------------------------ */
endBatch()170     public void endBatch()
171     {
172         synchronized(this)
173         {
174             if (--_batch==0 && _queue.size()>0 && _responsesPending<1)
175                 resume();
176         }
177     }
178 
179     /* ------------------------------------------------------------ */
getConnectionType()180     public String getConnectionType()
181     {
182         return _type;
183     }
184 
185     /* ------------------------------------------------------------ */
186     /* (non-Javadoc)
187      * @see org.mortbay.cometd.C#getId()
188      */
getId()189     public String getId()
190     {
191         return _id;
192     }
193 
194     /* ------------------------------------------------------------ */
hasMessages()195     public boolean hasMessages()
196     {
197         return _queue.size()>0;
198     }
199 
200     /* ------------------------------------------------------------ */
201     /**
202      * @return the commented
203      */
isJSONCommented()204     public boolean isJSONCommented()
205     {
206         synchronized(this)
207         {
208             return _JSONCommented;
209         }
210     }
211 
212     /* ------------------------------------------------------------ */
isLocal()213     public boolean isLocal()
214     {
215         return true;
216     }
217 
218     /* ------------------------------------------------------------ */
219     /* ------------------------------------------------------------ */
220     /* (non-Javadoc)
221      * @see dojox.cometd.Client#remove(boolean)
222      */
remove(boolean timeout)223     public void remove(boolean timeout)
224     {
225         synchronized(this)
226         {
227             Client client=_bayeux.removeClient(_id);
228             if (_bayeux.isLogInfo())
229                 _bayeux.logInfo("Remove client "+client+" timeout="+timeout);
230             if (_browserId!=null)
231                 _bayeux.clientOffBrowser(getBrowserId(),_id);
232             _browserId=null;
233 
234             if (_rListeners!=null)
235                 for (RemoveListener l:_rListeners)
236                     l.removed(_id, timeout);
237         }
238         resume();
239     }
240 
241     /* ------------------------------------------------------------ */
responded()242     public int responded()
243     {
244         synchronized(this)
245         {
246             return _responsesPending--;
247         }
248     }
249 
250     /* ------------------------------------------------------------ */
responsePending()251     public int responsePending()
252     {
253         synchronized(this)
254         {
255             return ++_responsesPending;
256         }
257     }
258 
259     /* ------------------------------------------------------------ */
260     /** Called by deliver to resume anything waiting on this client.
261      */
resume()262     public void resume()
263     {
264     }
265 
266     /* ------------------------------------------------------------ */
267     /**
268      * @param commented the commented to set
269      */
setJSONCommented(boolean commented)270     public void setJSONCommented(boolean commented)
271     {
272         synchronized(this)
273         {
274             _JSONCommented=commented;
275         }
276     }
277 
278     /* ------------------------------------------------------------ */
279     /*
280      * @return the number of messages queued
281      */
getMessages()282     public int getMessages()
283     {
284         return _queue.size();
285     }
286 
287     /* ------------------------------------------------------------ */
takeMessages()288     public List<Message> takeMessages()
289     {
290         synchronized(this)
291         {
292             ArrayList<Message> list = new ArrayList<Message>(_queue);
293             _queue.clear();
294             return list;
295         }
296     }
297 
298 
299     /* ------------------------------------------------------------ */
returnMessages(List<Message> messages)300     public void returnMessages(List<Message> messages)
301     {
302         synchronized(this)
303         {
304             _queue.addAll(0,messages);
305         }
306     }
307 
308     /* ------------------------------------------------------------ */
309     @Override
toString()310     public String toString()
311     {
312         return _id;
313     }
314 
315     /* ------------------------------------------------------------ */
addSubscription(ChannelImpl channel)316     protected void addSubscription(ChannelImpl channel)
317     {
318         synchronized (this)
319         {
320             _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
321         }
322     }
323 
324     /* ------------------------------------------------------------ */
removeSubscription(ChannelImpl channel)325     protected void removeSubscription(ChannelImpl channel)
326     {
327         synchronized (this)
328         {
329             _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
330         }
331     }
332 
333     /* ------------------------------------------------------------ */
setConnectionType(String type)334     protected void setConnectionType(String type)
335     {
336         synchronized (this)
337         {
338             _type=type;
339         }
340     }
341 
342     /* ------------------------------------------------------------ */
setId(String _id)343     protected void setId(String _id)
344     {
345         synchronized (this)
346         {
347             this._id=_id;
348         }
349     }
350 
351     /* ------------------------------------------------------------ */
unsubscribeAll()352     protected void unsubscribeAll()
353     {
354         ChannelImpl[] subscriptions;
355         synchronized(this)
356         {
357             _queue.clear();
358             subscriptions=_subscriptions;
359             _subscriptions=new ChannelImpl[0];
360         }
361         for (ChannelImpl channel : subscriptions)
362             channel.unsubscribe(this);
363 
364     }
365 
366     /* ------------------------------------------------------------ */
setBrowserId(String id)367     public void setBrowserId(String id)
368     {
369         if (_browserId!=null && !_browserId.equals(id))
370             _bayeux.clientOffBrowser(_browserId,_id);
371         _browserId=id;
372         if (_browserId!=null)
373             _bayeux.clientOnBrowser(_browserId,_id);
374     }
375 
376     /* ------------------------------------------------------------ */
getBrowserId()377     public String getBrowserId()
378     {
379         return _browserId;
380     }
381 
382     /* ------------------------------------------------------------ */
383     @Override
equals(Object o)384     public boolean equals(Object o)
385     {
386     	if (!(o instanceof Client))
387     		return false;
388     	return getId().equals(((Client)o).getId());
389     }
390 
391     /* ------------------------------------------------------------ */
392     /**
393      * Get the advice specific for this Client
394      * @return advice specific for this client or null
395      */
getAdvice()396     public JSON.Literal getAdvice()
397     {
398     	return _advice;
399     }
400 
401     /* ------------------------------------------------------------ */
402     /**
403      * @param advice specific for this client
404      */
setAdvice(JSON.Literal advice)405     public void setAdvice(JSON.Literal advice)
406     {
407     	_advice=advice;
408     }
409 
410 
411     /* ------------------------------------------------------------ */
addListener(ClientListener listener)412     public void addListener(ClientListener listener)
413     {
414     	synchronized(this)
415     	{
416     		if (listener instanceof MessageListener)
417     		{
418     			if (listener instanceof MessageListener.Synchronous)
419     				_syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
420     			else
421     				_asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
422     		}
423 
424     		if (listener instanceof RemoveListener)
425     			_rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
426 
427     		if (listener instanceof QueueListener)
428     		    _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
429 
430                 if (listener instanceof DeliverListener)
431                     _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
432     	}
433     }
434 
435     /* ------------------------------------------------------------ */
removeListener(ClientListener listener)436     public void removeListener(ClientListener listener)
437     {
438     	synchronized(this)
439     	{
440     		if (listener instanceof MessageListener)
441     		{
442     			_syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
443     			_asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
444     		}
445 
446     		if (listener instanceof RemoveListener)
447     			_rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
448 
449     		if (listener instanceof QueueListener)
450     		    _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
451     	}
452     }
453 
454     /* ------------------------------------------------------------ */
getTimeout()455     public long getTimeout()
456     {
457     	return _timeout;
458     }
459 
460     /* ------------------------------------------------------------ */
setTimeout(long timeoutMS)461     public void setTimeout(long timeoutMS)
462     {
463     	_timeout=timeoutMS;
464     }
465 
466     /* ------------------------------------------------------------ */
setMaxQueue(int maxQueue)467     public void setMaxQueue(int maxQueue)
468     {
469         _maxQueue=maxQueue;
470     }
471 
472     /* ------------------------------------------------------------ */
getMaxQueue()473     public int getMaxQueue()
474     {
475         return _maxQueue;
476     }
477 
478     /* ------------------------------------------------------------ */
getQueue()479     public Queue<Message> getQueue()
480     {
481         return _queue;
482     }
483 }
484