1 // ========================================================================
2 // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3 // ------------------------------------------------------------------------
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 //========================================================================
14 
15 package org.mortbay.cometd;
16 
17 import java.io.IOException;
18 import java.security.SecureRandom;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.ListIterator;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 
31 import javax.servlet.ServletContext;
32 import javax.servlet.http.HttpServletRequest;
33 
34 import org.cometd.Bayeux;
35 import org.cometd.BayeuxListener;
36 import org.cometd.Channel;
37 import org.cometd.ChannelBayeuxListener;
38 import org.cometd.Client;
39 import org.cometd.ClientBayeuxListener;
40 import org.cometd.Extension;
41 import org.cometd.Message;
42 import org.cometd.SecurityPolicy;
43 import org.mortbay.util.ajax.JSON;
44 
45 
46 /* ------------------------------------------------------------ */
47 /**
48  * @author gregw
49  * @author aabeling: added JSONP transport
50  *
51  */
52 public abstract class AbstractBayeux extends MessagePool implements Bayeux
53 {
54     public static final ChannelId META_ID=new ChannelId(META);
55     public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
56     public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
57     public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
58     public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
59     public static final ChannelId META_PING_ID=new ChannelId(META_PING);
60     public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
61     public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
62     public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
63 
64 
65     private static final Map<String,Object> EXT_JSON_COMMENTED=new HashMap<String,Object>(2){
66         {
67             this.put("json-comment-filtered",Boolean.TRUE);
68         }
69     };
70 
71 
72     private HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
73 
74     private ChannelImpl _root = new ChannelImpl("/",this);
75     private ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
76     protected SecurityPolicy _securityPolicy=new DefaultPolicy();
77     protected JSON.Literal _advice;
78     protected JSON.Literal _multiFrameAdvice;
79     protected int _adviceVersion=0;
80     protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
81     protected int _logLevel;
82     protected long _timeout=240000;
83     protected long _interval=0;
84     protected long _maxInterval=30000;
85     protected boolean _JSONCommented;
86     protected boolean _initialized;
87     protected ConcurrentHashMap<String, List<String>> _browser2client=new ConcurrentHashMap<String, List<String>>();
88     protected int _multiFrameInterval=-1;
89 
90     protected boolean _directDeliver=true;
91     protected boolean _requestAvailable;
92     protected ThreadLocal<HttpServletRequest> _request = new ThreadLocal<HttpServletRequest>();
93 
94     transient ServletContext _context;
95     transient Random _random;
96     transient ConcurrentHashMap<String, ChannelId> _channelIdCache;
97     protected Handler _publishHandler;
98     protected Handler _metaPublishHandler;
99     protected int _maxClientQueue=-1;
100 
101     protected List<Extension> _extensions=new CopyOnWriteArrayList<Extension>();
102     protected JSON.Literal _transports=new JSON.Literal("[\""+Bayeux.TRANSPORT_LONG_POLL+ "\",\""+Bayeux.TRANSPORT_CALLBACK_POLL+"\"]");
103     protected List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
104     protected List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
105 
106     /* ------------------------------------------------------------ */
107     /**
108      * @param context.
109      *            The logLevel init parameter is used to set the logging to
110      *            0=none, 1=info, 2=debug
111      */
AbstractBayeux()112     protected AbstractBayeux()
113     {
114         _publishHandler=new PublishHandler();
115         _metaPublishHandler=new MetaPublishHandler();
116         _handlers.put(META_HANDSHAKE,new HandshakeHandler());
117         _handlers.put(META_CONNECT,new ConnectHandler());
118         _handlers.put(META_DISCONNECT,new DisconnectHandler());
119         _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
120         _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
121         _handlers.put(META_PING,new PingHandler());
122 
123         setTimeout(getTimeout());
124     }
125 
126     /* ------------------------------------------------------------ */
addExtension(Extension ext)127     public void addExtension(Extension ext)
128     {
129         _extensions.add(ext);
130     }
131 
132     /* ------------------------------------------------------------ */
getExtensions()133     public List<Extension> getExtensions()
134     {
135         // TODO - remove this hack of a method!
136         return _extensions;
137     }
138 
139     /* ------------------------------------------------------------ */
removeExtension(Extension ext)140     public void removeExtension(Extension ext)
141     {
142         _extensions.remove(ext);
143     }
144 
145     /* ------------------------------------------------------------ */
146     /**
147      * @param id
148      * @return
149      */
getChannel(ChannelId id)150     public ChannelImpl getChannel(ChannelId id)
151     {
152         return _root.getChild(id);
153     }
154 
155     /* ------------------------------------------------------------ */
getChannel(String id)156     public ChannelImpl getChannel(String id)
157     {
158         ChannelId cid=getChannelId(id);
159         if (cid.depth()==0)
160             return null;
161         return _root.getChild(cid);
162     }
163 
164     /* ------------------------------------------------------------ */
getChannel(String id, boolean create)165     public Channel getChannel(String id, boolean create)
166     {
167         synchronized(this)
168         {
169             ChannelImpl channel=getChannel(id);
170 
171             if (channel==null && create)
172             {
173                 channel=new ChannelImpl(id,this);
174                 _root.addChild(channel);
175 
176                 if (isLogInfo())
177                     logInfo("newChannel: "+channel);
178             }
179             return channel;
180         }
181     }
182 
183     /* ------------------------------------------------------------ */
getChannelId(String id)184     public ChannelId getChannelId(String id)
185     {
186         ChannelId cid = _channelIdCache.get(id);
187         if (cid==null)
188         {
189             // TODO shrink cache!
190             cid=new ChannelId(id);
191             _channelIdCache.put(id,cid);
192         }
193         return cid;
194     }
195 
196     /* ------------------------------------------------------------ */
197     /* (non-Javadoc)
198      * @see org.mortbay.cometd.Bx#getClient(java.lang.String)
199      */
getClient(String client_id)200     public Client getClient(String client_id)
201     {
202         synchronized(this)
203         {
204             if (client_id==null)
205                 return null;
206             Client client = _clients.get(client_id);
207             return client;
208         }
209     }
210 
211     /* ------------------------------------------------------------ */
getClientIDs()212     public Set<String> getClientIDs()
213     {
214         return _clients.keySet();
215     }
216 
217     /* ------------------------------------------------------------ */
218     /**
219      * @return The maximum time in ms to wait between polls before timing out a client
220      */
getMaxInterval()221     public long getMaxInterval()
222     {
223         return _maxInterval;
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * @return the logLevel. 0=none, 1=info, 2=debug
229      */
getLogLevel()230     public int getLogLevel()
231     {
232         return _logLevel;
233     }
234 
235     /* ------------------------------------------------------------ */
236     /* (non-Javadoc)
237      * @see org.mortbay.cometd.Bx#getSecurityPolicy()
238      */
getSecurityPolicy()239     public SecurityPolicy getSecurityPolicy()
240     {
241         return _securityPolicy;
242     }
243 
244     /* ------------------------------------------------------------ */
getTimeout()245     public long getTimeout()
246     {
247         return _timeout;
248     }
249 
250     /* ------------------------------------------------------------ */
getInterval()251     public long getInterval()
252     {
253         return _interval;
254     }
255 
256     /* ------------------------------------------------------------ */
257     /**
258      * @return true if published messages are directly delivered to subscribers. False if
259      * a new message is to be created that holds only supported fields.
260      */
isDirectDeliver()261     public boolean isDirectDeliver()
262     {
263         return _directDeliver;
264     }
265 
266     /* ------------------------------------------------------------ */
267     /**
268      * @param directDeliver true if published messages are directly delivered to subscribers. False if
269      * a new message is to be created that holds only supported fields.
270      */
setDirectDeliver(boolean directDeliver)271     public void setDirectDeliver(boolean directDeliver)
272     {
273         _directDeliver = directDeliver;
274     }
275 
276     /* ------------------------------------------------------------ */
277     /** Handle a Bayeux message.
278      * This is normally only called by the bayeux servlet or a test harness.
279      * @param client The client if known
280      * @param transport The transport to use for the message
281      * @param message The bayeux message.
282      */
handle(ClientImpl client, Transport transport, Message message)283     public String handle(ClientImpl client, Transport transport, Message message) throws IOException
284     {
285         String channel_id=message.getChannel();
286 
287         Handler handler=(Handler)_handlers.get(channel_id);
288         if (handler!=null)
289         {
290             // known meta channel
291             ListIterator<Extension> iter = _extensions.listIterator(_extensions.size());
292             while(iter.hasPrevious())
293                 message=iter.previous().rcvMeta(message);
294 
295             handler.handle(client,transport,message);
296             _metaPublishHandler.handle(client,transport,message);
297         }
298         else if (channel_id.startsWith(META_SLASH))
299         {
300             // unknown meta channel
301             ListIterator<Extension> iter = _extensions.listIterator(_extensions.size());
302             while(iter.hasPrevious())
303                 message=iter.previous().rcvMeta(message);
304             _metaPublishHandler.handle(client,transport,message);
305         }
306         else
307         {
308             // non meta channel
309             handler=_publishHandler;
310             ListIterator<Extension> iter = _extensions.listIterator(_extensions.size());
311             while(iter.hasPrevious())
312                 message=iter.previous().rcv(message);
313             handler.handle(client,transport,message);
314         }
315 
316         return channel_id;
317     }
318 
319     /* ------------------------------------------------------------ */
hasChannel(String id)320     public boolean hasChannel(String id)
321     {
322         ChannelId cid=getChannelId(id);
323         return _root.getChild(cid)!=null;
324     }
325 
326     /* ------------------------------------------------------------ */
isInitialized()327     public boolean isInitialized()
328     {
329         return _initialized;
330     }
331 
332     /* ------------------------------------------------------------ */
333     /**
334      * @return the commented
335      */
isJSONCommented()336     public boolean isJSONCommented()
337     {
338         return _JSONCommented;
339     }
340 
341     /* ------------------------------------------------------------ */
isLogDebug()342     public boolean isLogDebug()
343     {
344         return _logLevel>1;
345     }
346 
347     /* ------------------------------------------------------------ */
isLogInfo()348     public boolean isLogInfo()
349     {
350         return _logLevel>0;
351     }
352 
353     /* ------------------------------------------------------------ */
logDebug(String message)354     public void logDebug(String message)
355     {
356         if (_logLevel>1)
357             _context.log(message);
358     }
359 
360     /* ------------------------------------------------------------ */
logDebug(String message, Throwable th)361     public void logDebug(String message, Throwable th)
362     {
363         if (_logLevel>1)
364             _context.log(message,th);
365     }
366 
367     /* ------------------------------------------------------------ */
logWarn(String message, Throwable th)368     public void logWarn(String message, Throwable th)
369     {
370         _context.log(message+": "+th.toString());
371     }
372 
373     /* ------------------------------------------------------------ */
logWarn(String message)374     public void logWarn(String message)
375     {
376         _context.log(message);
377     }
378 
379     /* ------------------------------------------------------------ */
logInfo(String message)380     public void logInfo(String message)
381     {
382         if (_logLevel>0)
383             _context.log(message);
384     }
385 
386     /* ------------------------------------------------------------ */
newClient(String idPrefix)387     public Client newClient(String idPrefix)
388     {
389         ClientImpl client = new ClientImpl(this,idPrefix);
390         return client;
391     }
392 
393     /* ------------------------------------------------------------ */
newRemoteClient()394     public abstract ClientImpl newRemoteClient();
395 
396     /* ------------------------------------------------------------ */
397     /** Create new transport object for a bayeux message
398      * @param client The client
399      * @param message the bayeux message
400      * @return the negotiated transport.
401      */
newTransport(ClientImpl client, Map<?,?> message)402     public Transport newTransport(ClientImpl client, Map<?,?> message)
403     {
404         if (isLogDebug())
405             logDebug("newTransport: client="+client+",message="+message);
406 
407         Transport result=null;
408 
409         try
410         {
411             String type=client==null?null:client.getConnectionType();
412             if (type==null)
413                 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
414 
415             if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type) || type==null)
416             {
417                 String jsonp=(String)message.get(Bayeux.JSONP_PARAMETER);
418                 if(jsonp!=null)
419                     result=new JSONPTransport(client!=null&&client.isJSONCommented(),jsonp);
420                 else
421                     result=new JSONTransport(client!=null&&client.isJSONCommented());
422             }
423             else
424                 result=new JSONTransport(client!=null&&client.isJSONCommented());
425 
426         }
427         catch (Exception e)
428         {
429             throw new RuntimeException(e);
430         }
431 
432         if (isLogDebug())
433             logDebug("newTransport: result="+result);
434         return result;
435     }
436 
437     /* ------------------------------------------------------------ */
438     /** Publish data to a channel.
439      * Creates a message and delivers it to the root channel.
440      * @param to
441      * @param from
442      * @param data
443      * @param msgId
444      */
doPublish(ChannelId to, Client from, Object data, String msgId)445     protected void doPublish(ChannelId to, Client from, Object data, String msgId)
446     {
447         Message msg = newMessage();
448         msg.put(CHANNEL_FIELD,to.toString());
449 
450         if (msgId==null)
451         {
452             long id=msg.hashCode()
453             ^(to==null?0:to.hashCode())
454             ^(from==null?0:from.hashCode());
455             id=id<0?-id:id;
456             msg.put(ID_FIELD,Long.toString(id,36));
457         }
458         else
459             msg.put(ID_FIELD,msgId);
460 
461         msg.put(DATA_FIELD,data);
462 
463         for (Extension e:_extensions)
464             msg=e.send(msg);
465         _root.doDelivery(to,from,msg);
466         ((MessageImpl)msg).decRef();
467     }
468 
469     /* ------------------------------------------------------------ */
470     public boolean removeChannel(ChannelImpl channel)
471     {
472         boolean removed = _root.doRemove(channel);
473         if (removed)
474             for (ChannelBayeuxListener l : _channelListeners)
475                 l.channelRemoved(channel);
476         return removed;
477     }
478 
479     /* ------------------------------------------------------------ */
480     public void addChannel(ChannelImpl channel)
481     {
482         for (ChannelBayeuxListener l : _channelListeners)
483             l.channelAdded(channel);
484     }
485 
486     /* ------------------------------------------------------------ */
487     protected String newClientId(long variation, String idPrefix)
488     {
489         if (idPrefix==null)
490             return Long.toString(getRandom(),36)+Long.toString(variation,36);
491         else
492             return idPrefix+"_"+Long.toString(getRandom(),36);
493     }
494 
495     /* ------------------------------------------------------------ */
496     protected void addClient(ClientImpl client,String idPrefix)
497     {
498         while(true)
499         {
500             String id = newClientId(client.hashCode(),idPrefix);
501             client.setId(id);
502 
503             ClientImpl other = _clients.putIfAbsent(id,client);
504             if (other==null)
505             {
506                 for (ClientBayeuxListener l : _clientListeners)
507                     l.clientAdded((Client)client);
508 
509                 return;
510             }
511         }
512     }
513 
514     /* ------------------------------------------------------------ */
515     /* (non-Javadoc)
516      * @see org.mortbay.cometd.Bx#removeClient(java.lang.String)
517      */
518     public Client removeClient(String client_id)
519     {
520         ClientImpl client;
521         synchronized(this)
522         {
523             if (client_id==null)
524                 return null;
525             client = _clients.remove(client_id);
526         }
527         if (client!=null)
528         {
529             client.unsubscribeAll();
530             for (ClientBayeuxListener l : _clientListeners)
531                 l.clientRemoved((Client)client);
532         }
533         return client;
534     }
535 
536     /* ------------------------------------------------------------ */
537     /**
538      * @param ms The maximum time in ms to wait between polls before timing out a client
539      */
540     public void setMaxInterval(long ms)
541     {
542         _maxInterval=ms;
543     }
544 
545     /* ------------------------------------------------------------ */
546     /**
547      * @param commented the commented to set
548      */
549     public void setJSONCommented(boolean commented)
550     {
551         _JSONCommented=commented;
552     }
553 
554     /* ------------------------------------------------------------ */
555     /**
556      * @param logLevel
557      *            the logLevel: 0=none, 1=info, 2=debug
558      */
559     public void setLogLevel(int logLevel)
560     {
561         _logLevel=logLevel;
562     }
563 
564     /* ------------------------------------------------------------ */
565     /* (non-Javadoc)
566      * @see org.mortbay.cometd.Bx#setSecurityPolicy(org.mortbay.cometd.SecurityPolicy)
567      */
568     public void setSecurityPolicy(SecurityPolicy securityPolicy)
569     {
570         _securityPolicy=securityPolicy;
571     }
572 
573 
574     /* ------------------------------------------------------------ */
575     public void setTimeout(long ms)
576     {
577         _timeout = ms;
578         generateAdvice();
579     }
580 
581 
582     /* ------------------------------------------------------------ */
583     public void setInterval(long ms)
584     {
585         _interval = ms;
586         generateAdvice();
587     }
588 
589     /* ------------------------------------------------------------ */
590     /**
591      * The time a client should delay between reconnects when multiple
592      * connections from the same browser are detected. This effectively
593      * produces traditional polling.
594      * @param multiFrameInterval the multiFrameInterval to set
595      */
596     public void setMultiFrameInterval(int multiFrameInterval)
597     {
598         _multiFrameInterval=multiFrameInterval;
599         generateAdvice();
600     }
601 
602     /* ------------------------------------------------------------ */
603     /**
604      * @return the multiFrameInterval in milliseconds
605      */
606     public int getMultiFrameInterval()
607     {
608         return _multiFrameInterval;
609     }
610 
611     /* ------------------------------------------------------------ */
612     void generateAdvice()
613     {
614         setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":"+getInterval()+",\"timeout\":"+getTimeout()+"}"));
615     }
616 
617     /* ------------------------------------------------------------ */
618     public void setAdvice(JSON.Literal advice)
619     {
620         synchronized(this)
621         {
622             _adviceVersion++;
623             _advice=advice;
624             _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
625         }
626     }
627 
628     /* ------------------------------------------------------------ */
629     private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
630     {
631         Map<String,Object> a = (Map<String,Object>)JSON.parse(_advice.toString());
632         a.put("multiple-clients",Boolean.TRUE);
633         if (_multiFrameInterval>0)
634         {
635             a.put("reconnect","retry");
636             a.put("interval",_multiFrameInterval);
637         }
638         else
639             a.put("reconnect","none");
640         return a;
641     }
642 
643 
644 
645     /* ------------------------------------------------------------ */
getAdvice()646     public JSON.Literal getAdvice()
647     {
648         return _advice;
649     }
650 
651     /* ------------------------------------------------------------ */
652     /**
653      * @return TRUE if {@link #getCurrentRequest()} will return the current request
654      */
isRequestAvailable()655     public boolean isRequestAvailable()
656     {
657         return _requestAvailable;
658     }
659 
660     /* ------------------------------------------------------------ */
661     /**
662      * @param requestAvailable TRUE if {@link #getCurrentRequest()} will return the current request
663      */
setRequestAvailable(boolean requestAvailable)664     public void setRequestAvailable(boolean requestAvailable)
665     {
666         _requestAvailable=requestAvailable;
667     }
668 
669     /* ------------------------------------------------------------ */
670     /**
671      * @return the current request if {@link #isRequestAvailable()} is true, else null
672      */
getCurrentRequest()673     public HttpServletRequest getCurrentRequest()
674     {
675         return _request.get();
676     }
677 
678     /* ------------------------------------------------------------ */
679     /**
680      * @return the current request if {@link #isRequestAvailable()} is true, else null
681      */
setCurrentRequest(HttpServletRequest request)682     void setCurrentRequest(HttpServletRequest request)
683     {
684         _request.set(request);
685     }
686 
687 
688 
689     /* ------------------------------------------------------------ */
getChannels()690     public Collection<Channel> getChannels()
691     {
692         List<Channel> channels = new ArrayList<Channel>();
693         _root.getChannels(channels);
694         return channels;
695     }
696 
697     /* ------------------------------------------------------------ */
698     /**
699      * @return
700      */
getChannelCount()701     public int getChannelCount()
702     {
703         return _root.getChannelCount();
704     }
705 
706     /* ------------------------------------------------------------ */
getClients()707     public Collection<Client> getClients()
708     {
709         synchronized(this)
710         {
711             return new ArrayList<Client>(_clients.values());
712         }
713     }
714 
715     /* ------------------------------------------------------------ */
716     /**
717      * @return
718      */
getClientCount()719     public int getClientCount()
720     {
721         synchronized(this)
722         {
723             return _clients.size();
724         }
725     }
726 
727     /* ------------------------------------------------------------ */
hasClient(String clientId)728     public boolean hasClient(String clientId)
729     {
730         synchronized(this)
731         {
732             if (clientId==null)
733                 return false;
734             return _clients.containsKey(clientId);
735         }
736     }
737 
738     /* ------------------------------------------------------------ */
removeChannel(String channelId)739     public Channel removeChannel(String channelId)
740     {
741         Channel channel = getChannel(channelId);
742 
743         boolean removed = false;
744         if (channel!=null)
745             removed = channel.remove();
746 
747         if (removed)
748             return channel;
749         else
750             return null;
751     }
752 
753     /* ------------------------------------------------------------ */
initialize(ServletContext context)754     protected void initialize(ServletContext context)
755     {
756         synchronized(this)
757         {
758             _initialized=true;
759             _context=context;
760             try
761             {
762                 _random=SecureRandom.getInstance("SHA1PRNG");
763             }
764             catch (Exception e)
765             {
766                 context.log("Could not get secure random for ID generation",e);
767                 _random=new Random();
768             }
769             _random.setSeed(_random.nextLong()^hashCode()^(context.hashCode()<<32)^Runtime.getRuntime().freeMemory());
770             _channelIdCache=new ConcurrentHashMap<String, ChannelId>();
771 
772             _root.addChild(new ServiceChannel(Bayeux.SERVICE));
773 
774         }
775     }
776 
777     /* ------------------------------------------------------------ */
getRandom()778     long getRandom()
779     {
780         long l=_random.nextLong();
781         return l<0?-l:l;
782     }
783 
784     /* ------------------------------------------------------------ */
clientOnBrowser(String browserId,String clientId)785     void clientOnBrowser(String browserId,String clientId)
786     {
787         List<String> clients=_browser2client.get(browserId);
788         if (clients==null)
789         {
790             List<String> new_clients=new CopyOnWriteArrayList<String>();
791             clients=_browser2client.putIfAbsent(browserId,new_clients);
792             if (clients==null)
793                 clients=new_clients;
794         }
795         clients.add(clientId);
796     }
797 
798     /* ------------------------------------------------------------ */
clientOffBrowser(String browserId,String clientId)799     void clientOffBrowser(String browserId,String clientId)
800     {
801         List<String> clients=_browser2client.get(browserId);
802         if (clients!=null)
803             clients.remove(clientId);
804     }
805 
806     /* ------------------------------------------------------------ */
clientsOnBrowser(String browserId)807     List<String> clientsOnBrowser(String browserId)
808     {
809         List<String> clients=_browser2client.get(browserId);
810         if (clients==null)
811             return Collections.emptyList();
812         return clients;
813     }
814 
815     /* ------------------------------------------------------------ */
addListener(BayeuxListener listener)816     public void addListener(BayeuxListener listener)
817     {
818         if (listener instanceof ClientBayeuxListener)
819             _clientListeners.add((ClientBayeuxListener)listener);
820         else if(listener instanceof ChannelBayeuxListener)
821             _channelListeners.add((ChannelBayeuxListener)listener);
822     }
823 
824     /* ------------------------------------------------------------ */
getMaxClientQueue()825     public int getMaxClientQueue()
826     {
827         return _maxClientQueue;
828     }
829 
830     /* ------------------------------------------------------------ */
setMaxClientQueue(int size)831     public void setMaxClientQueue(int size)
832     {
833         _maxClientQueue=size;
834     }
835 
836 
837 
838     /* ------------------------------------------------------------ */
839     /* ------------------------------------------------------------ */
840     public static class DefaultPolicy implements SecurityPolicy
841     {
canHandshake(Message message)842         public boolean canHandshake(Message message)
843         {
844             return true;
845         }
846 
canCreate(Client client, String channel, Message message)847         public boolean canCreate(Client client, String channel, Message message)
848         {
849             return client!=null && !channel.startsWith(Bayeux.META_SLASH);
850         }
851 
canSubscribe(Client client, String channel, Message message)852         public boolean canSubscribe(Client client, String channel, Message message)
853         {
854 	    if (client!=null && ("/**".equals(channel) || "/*".equals(channel)))
855 	        return false;
856             return client!=null && !channel.startsWith(Bayeux.META_SLASH);
857         }
858 
canPublish(Client client, String channel, Message message)859         public boolean canPublish(Client client, String channel, Message message)
860         {
861             return client!=null || client==null && Bayeux.META_HANDSHAKE.equals(channel);
862         }
863 
864     }
865 
866 
867     /* ------------------------------------------------------------ */
868     /* ------------------------------------------------------------ */
869     protected abstract class Handler
870     {
871         abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
872         abstract ChannelId getMetaChannelId();
unknownClient(Transport transport,String channel)873         void unknownClient(Transport transport,String channel) throws IOException
874         {
875             MessageImpl reply=newMessage();
876 
877             reply.put(CHANNEL_FIELD,channel);
878             reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
879             reply.put(ERROR_FIELD,"402::Unknown client");
880             reply.put("advice",_handshakeAdvice);
881             transport.send(reply);
882         }
883     }
884 
885     /* ------------------------------------------------------------ */
886     /* ------------------------------------------------------------ */
887     protected class ConnectHandler extends Handler
888     {
889         protected String _metaChannel=META_CONNECT;
890 
891         @Override
getMetaChannelId()892         ChannelId getMetaChannelId()
893         {
894             return META_CONNECT_ID;
895         }
896 
897         @Override
handle(ClientImpl client, Transport transport, Message message)898         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
899         {
900             if (client==null)
901             {
902                 unknownClient(transport,_metaChannel);
903                 return;
904             }
905 
906             // is this the first connect message?
907             String type=client.getConnectionType();
908             boolean polling=true;
909             if (type==null)
910             {
911                 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
912                 client.setConnectionType(type);
913                 polling=false;
914             }
915 
916             Object advice = message.get(ADVICE_FIELD);
917             if (advice!=null)
918             {
919                 Long timeout=(Long)((Map)advice).get("timeout");
920                 if (timeout!=null && timeout.longValue()>0)
921                     client.setTimeout(timeout.longValue());
922                 else
923                     client.setTimeout(0);
924             }
925             else
926                 client.setTimeout(0);
927 
928             advice=null;
929 
930             // Work out if multiple clients from some browser?
931             if (polling && _multiFrameInterval>0 && client.getBrowserId()!=null)
932             {
933                 List<String> clients=clientsOnBrowser(client.getBrowserId());
934                 int count=clients.size();
935                 if (count>1)
936                 {
937                     polling=clients.get(0).equals(client.getId());
938                     advice=client.getAdvice();
939                     if (advice==null)
940                         advice=_multiFrameAdvice;
941                     else // could probably cache this
942                         advice=multiFrameAdvice((JSON.Literal)advice);
943                 }
944             }
945 
946             synchronized(this)
947             {
948                 if (advice==null)
949                 {
950                     if (_adviceVersion!=client._adviseVersion)
951                     {
952                         advice=_advice;
953                         client._adviseVersion=_adviceVersion;
954                     }
955                 }
956                 else
957                     client._adviseVersion=-1; // clear so it is reset after multi state clears
958             }
959 
960             // reply to connect message
961             String id=message.getId();
962 
963             Message reply=newMessage(message);
964 
965             reply.put(CHANNEL_FIELD,META_CONNECT);
966             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
967             if (advice!=null)
968                 reply.put(ADVICE_FIELD,advice);
969             if (id!=null)
970                 reply.put(ID_FIELD,id);
971 
972             if (polling)
973                 transport.setPollReply(reply);
974             else
975             {
976                 for (Extension e:_extensions)
977                     reply=e.sendMeta(reply);
978                 transport.send(reply);
979             }
980         }
981     }
982 
983     /* ------------------------------------------------------------ */
984     /* ------------------------------------------------------------ */
985     protected class DisconnectHandler extends Handler
986     {
987         @Override
getMetaChannelId()988         ChannelId getMetaChannelId()
989         {
990             return META_DISCONNECT_ID;
991         }
992 
993         @Override
handle(ClientImpl client, Transport transport, Message message)994         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
995         {
996             if (client==null)
997             {
998                 unknownClient(transport,META_DISCONNECT);
999                 return;
1000             }
1001             if (isLogInfo())
1002                 logInfo("Disconnect "+client.getId());
1003 
1004             client.remove(false);
1005 
1006             Message reply=newMessage(message);
1007             reply.put(CHANNEL_FIELD,META_DISCONNECT);
1008             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1009             String id=message.getId();
1010             if (id!=null)
1011                 reply.put(ID_FIELD,id);
1012 
1013             for (Extension e:_extensions)
1014                 reply=e.sendMeta(reply);
1015 
1016             Message pollReply = transport.getPollReply();
1017             if (pollReply!=null)
1018             {
1019                 for (Extension e:_extensions)
1020                     pollReply=e.sendMeta(pollReply);
1021                 transport.send(pollReply);
1022                 transport.setPollReply(null);
1023             }
1024             transport.send(reply);
1025         }
1026     }
1027 
1028 
1029     /* ------------------------------------------------------------ */
1030     /* ------------------------------------------------------------ */
1031     protected class HandshakeHandler extends Handler
1032     {
1033         @Override
getMetaChannelId()1034         ChannelId getMetaChannelId()
1035         {
1036             return META_HANDSHAKE_ID;
1037         }
1038 
1039         @Override
handle(ClientImpl client, Transport transport, Message message)1040         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1041         {
1042             if (client!=null)
1043                 throw new IllegalStateException();
1044 
1045             if (_securityPolicy!=null && !_securityPolicy.canHandshake(message))
1046             {
1047                 Message reply=newMessage(message);
1048                 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1049                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1050                 reply.put(ERROR_FIELD,"403::Handshake denied");
1051 
1052                 for (Extension e:_extensions)
1053                     reply=e.sendMeta(reply);
1054 
1055                 transport.send(reply);
1056                 return;
1057             }
1058 
1059             client=newRemoteClient();
1060 
1061             Map<?,?> ext = (Map<?,?>)message.get(EXT_FIELD);
1062 
1063             boolean commented=_JSONCommented && ext!=null && Boolean.TRUE.equals(ext.get("json-comment-filtered"));
1064 
1065             Message reply=newMessage(message);
1066             reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1067             reply.put("version","1.0");
1068             reply.put("minimumVersion","0.9");
1069             if (isJSONCommented())
1070                 reply.put(EXT_FIELD,EXT_JSON_COMMENTED);
1071 
1072             if (client!=null)
1073             {
1074                 reply.put("supportedConnectionTypes",_transports);
1075                 reply.put("successful",Boolean.TRUE);
1076                 reply.put(CLIENT_FIELD,client.getId());
1077                 if (_advice!=null)
1078                     reply.put(ADVICE_FIELD,_advice);
1079                 client.setJSONCommented(commented);
1080                 transport.setJSONCommented(commented);
1081             }
1082             else
1083             {
1084                 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1085                 if (_advice!=null)
1086                     reply.put(ADVICE_FIELD,_advice);
1087             }
1088 
1089             if (isLogDebug())
1090                 logDebug("handshake.handle: reply="+reply);
1091 
1092             String id=message.getId();
1093             if (id!=null)
1094                 reply.put(ID_FIELD,id);
1095 
1096             for (Extension e:_extensions)
1097                 reply=e.sendMeta(reply);
1098             transport.send(reply);
1099         }
1100     }
1101 
1102     /* ------------------------------------------------------------ */
1103     /* ------------------------------------------------------------ */
1104     protected class PublishHandler extends Handler
1105     {
1106         @Override
getMetaChannelId()1107         ChannelId getMetaChannelId()
1108         {
1109             return null;
1110         }
1111 
1112         @Override
handle(ClientImpl client, Transport transport, Message message)1113         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1114         {
1115             String channel_id=message.getChannel();
1116 
1117             if (client==null && message.containsKey(CLIENT_FIELD))
1118             {
1119                 unknownClient(transport,channel_id);
1120                 return;
1121             }
1122 
1123             String id=message.getId();
1124 
1125             ChannelId cid=getChannelId(channel_id);
1126             Object data=message.get(Bayeux.DATA_FIELD);
1127 
1128             Message reply=newMessage(message);
1129             reply.put(CHANNEL_FIELD,channel_id);
1130             if (id!=null)
1131                 reply.put(ID_FIELD,id);
1132 
1133             if (data!=null&&_securityPolicy.canPublish(client,channel_id,message))
1134             {
1135                 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1136 
1137                 for (Extension e:_extensions)
1138                     reply=e.sendMeta(reply);
1139 
1140                 transport.send(reply);
1141                 if (_directDeliver)
1142                 {
1143                     message.remove(CLIENT_FIELD);
1144                     for (Extension e:_extensions)
1145                         message=e.send(message);
1146                     _root.doDelivery(cid,client,message);
1147                 }
1148                 else
1149                     doPublish(cid,client,data,id==null?null:id);
1150             }
1151             else
1152             {
1153                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1154                 reply.put(ERROR_FIELD,"403::Publish denied");
1155 
1156                 for (Extension e:_extensions)
1157                     reply=e.sendMeta(reply);
1158                 transport.send(reply);
1159             }
1160         }
1161     }
1162 
1163     /* ------------------------------------------------------------ */
1164     /* ------------------------------------------------------------ */
1165     protected class MetaPublishHandler extends Handler
1166     {
1167         @Override
getMetaChannelId()1168         ChannelId getMetaChannelId()
1169         {
1170             return null;
1171         }
1172 
1173         @Override
handle(ClientImpl client, Transport transport, Message message)1174         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1175         {
1176             String channel_id=message.getChannel();
1177 
1178             if (client==null && !META_HANDSHAKE.equals(channel_id))
1179             {
1180                 // unknown client
1181                 return;
1182             }
1183 
1184             if(_securityPolicy.canPublish(client,channel_id,message))
1185             {
1186                 _root.doDelivery(getChannelId(channel_id),client,message);
1187             }
1188         }
1189     }
1190 
1191     /* ------------------------------------------------------------ */
1192     /* ------------------------------------------------------------ */
1193     protected class SubscribeHandler extends Handler
1194     {
1195         @Override
getMetaChannelId()1196         ChannelId getMetaChannelId()
1197         {
1198             return META_SUBSCRIBE_ID;
1199         }
1200 
1201         @Override
handle(ClientImpl client, Transport transport, Message message)1202         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1203         {
1204             if (client==null)
1205             {
1206                 unknownClient(transport,META_SUBSCRIBE);
1207                 return;
1208             }
1209 
1210             String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1211 
1212             // select a random channel ID if none specifified
1213             if (subscribe_id==null)
1214             {
1215                 subscribe_id=Long.toString(getRandom(),36);
1216                 while (getChannel(subscribe_id)!=null)
1217                     subscribe_id=Long.toString(getRandom(),36);
1218             }
1219 
1220             ChannelId cid=null;
1221             boolean can_subscribe=false;
1222 
1223             if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1224             {
1225                 can_subscribe=true;
1226             }
1227             else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1228             {
1229                 can_subscribe=false;
1230             }
1231             else
1232             {
1233                 cid=getChannelId(subscribe_id);
1234                 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1235             }
1236 
1237             Message reply=newMessage(message);
1238             reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1239             reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1240 
1241             if (can_subscribe)
1242             {
1243                 if (cid!=null)
1244                 {
1245                     ChannelImpl channel=getChannel(cid);
1246                     if (channel==null&&_securityPolicy.canCreate(client,subscribe_id,message))
1247                         channel=(ChannelImpl)getChannel(subscribe_id, true);
1248 
1249                     if (channel!=null)
1250                         channel.subscribe(client);
1251                     else
1252                         can_subscribe=false;
1253                 }
1254 
1255                 if (can_subscribe)
1256                 {
1257                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1258                 }
1259                 else
1260                 {
1261                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1262                     reply.put(ERROR_FIELD,"403::cannot create");
1263                 }
1264             }
1265             else
1266             {
1267                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1268                 reply.put(ERROR_FIELD,"403::cannot subscribe");
1269 
1270             }
1271 
1272             String id=message.getId();
1273             if (id!=null)
1274                 reply.put(ID_FIELD,id);
1275             for (Extension e:_extensions)
1276                 reply=e.sendMeta(reply);
1277             transport.send(reply);
1278         }
1279     }
1280 
1281     /* ------------------------------------------------------------ */
1282     /* ------------------------------------------------------------ */
1283     protected class UnsubscribeHandler extends Handler
1284     {
1285         @Override
getMetaChannelId()1286         ChannelId getMetaChannelId()
1287         {
1288             return META_UNSUBSCRIBE_ID;
1289         }
1290 
1291         @Override
handle(ClientImpl client, Transport transport, Message message)1292         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1293         {
1294             if (client==null)
1295             {
1296                 unknownClient(transport,META_UNSUBSCRIBE);
1297                 return;
1298             }
1299 
1300             String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1301             ChannelImpl channel=getChannel(channel_id);
1302             if (channel!=null)
1303                 channel.unsubscribe(client);
1304 
1305             Message reply=newMessage(message);
1306             reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1307             if (channel!=null)
1308             {
1309                 channel.unsubscribe(client);
1310                 reply.put(SUBSCRIPTION_FIELD,channel.getId());
1311                 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1312             }
1313             else
1314                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1315 
1316             String id=message.getId();
1317             if (id!=null)
1318                 reply.put(ID_FIELD,id);
1319             for (Extension e:_extensions)
1320                 reply=e.sendMeta(reply);
1321             transport.send(reply);
1322         }
1323     }
1324 
1325     /* ------------------------------------------------------------ */
1326     /* ------------------------------------------------------------ */
1327     protected class PingHandler extends Handler
1328     {
1329         @Override
getMetaChannelId()1330         ChannelId getMetaChannelId()
1331         {
1332             return META_PING_ID;
1333         }
1334 
1335         @Override
handle(ClientImpl client, Transport transport, Message message)1336         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1337         {
1338             Message reply=newMessage(message);
1339             reply.put(CHANNEL_FIELD,META_PING);
1340             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1341 
1342             String id=message.getId();
1343             if (id!=null)
1344                 reply.put(ID_FIELD,id);
1345             for (Extension e:_extensions)
1346                 reply=e.sendMeta(reply);
1347             transport.send(reply);
1348         }
1349     }
1350 
1351 
1352     /* ------------------------------------------------------------ */
1353     /* ------------------------------------------------------------ */
1354     protected class ServiceChannel extends ChannelImpl
1355     {
ServiceChannel(String id)1356         ServiceChannel(String id)
1357         {
1358             super(id,AbstractBayeux.this);
1359         }
1360 
1361         /* ------------------------------------------------------------ */
1362         /* (non-Javadoc)
1363          * @see org.mortbay.cometd.ChannelImpl#addChild(org.mortbay.cometd.ChannelImpl)
1364          */
1365         @Override
addChild(ChannelImpl channel)1366         public void addChild(ChannelImpl channel)
1367         {
1368             super.addChild(channel);
1369             setPersistent(true);
1370         }
1371 
1372         /* ------------------------------------------------------------ */
1373         @Override
subscribe(Client client)1374         public void subscribe(Client client)
1375         {
1376             if (client.isLocal())
1377                 super.subscribe(client);
1378         }
1379 
1380     }
1381 }
1382