1 // ======================================================================== 2 // Copyright 2006 Mort Bay Consulting Pty. Ltd. 3 // ------------------------------------------------------------------------ 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 //======================================================================== 14 15 package org.mortbay.cometd; 16 17 import java.io.IOException; 18 import java.security.SecureRandom; 19 import java.util.ArrayList; 20 import java.util.Collection; 21 import java.util.Collections; 22 import java.util.HashMap; 23 import java.util.List; 24 import java.util.ListIterator; 25 import java.util.Map; 26 import java.util.Random; 27 import java.util.Set; 28 import java.util.concurrent.ConcurrentHashMap; 29 import java.util.concurrent.CopyOnWriteArrayList; 30 31 import javax.servlet.ServletContext; 32 import javax.servlet.http.HttpServletRequest; 33 34 import org.cometd.Bayeux; 35 import org.cometd.BayeuxListener; 36 import org.cometd.Channel; 37 import org.cometd.ChannelBayeuxListener; 38 import org.cometd.Client; 39 import org.cometd.ClientBayeuxListener; 40 import org.cometd.Extension; 41 import org.cometd.Message; 42 import org.cometd.SecurityPolicy; 43 import org.mortbay.util.ajax.JSON; 44 45 46 /* ------------------------------------------------------------ */ 47 /** 48 * @author gregw 49 * @author aabeling: added JSONP transport 50 * 51 */ 52 public abstract class AbstractBayeux extends MessagePool implements Bayeux 53 { 54 public static final ChannelId META_ID=new ChannelId(META); 55 public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT); 56 public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT); 57 public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT); 58 public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE); 59 public static final ChannelId META_PING_ID=new ChannelId(META_PING); 60 public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS); 61 public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE); 62 public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE); 63 64 65 private static final Map<String,Object> EXT_JSON_COMMENTED=new HashMap<String,Object>(2){ 66 { 67 this.put("json-comment-filtered",Boolean.TRUE); 68 } 69 }; 70 71 72 private HashMap<String,Handler> _handlers=new HashMap<String,Handler>(); 73 74 private ChannelImpl _root = new ChannelImpl("/",this); 75 private ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>(); 76 protected SecurityPolicy _securityPolicy=new DefaultPolicy(); 77 protected JSON.Literal _advice; 78 protected JSON.Literal _multiFrameAdvice; 79 protected int _adviceVersion=0; 80 protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}"); 81 protected int _logLevel; 82 protected long _timeout=240000; 83 protected long _interval=0; 84 protected long _maxInterval=30000; 85 protected boolean _JSONCommented; 86 protected boolean _initialized; 87 protected ConcurrentHashMap<String, List<String>> _browser2client=new ConcurrentHashMap<String, List<String>>(); 88 protected int _multiFrameInterval=-1; 89 90 protected boolean _directDeliver=true; 91 protected boolean _requestAvailable; 92 protected ThreadLocal<HttpServletRequest> _request = new ThreadLocal<HttpServletRequest>(); 93 94 transient ServletContext _context; 95 transient Random _random; 96 transient ConcurrentHashMap<String, ChannelId> _channelIdCache; 97 protected Handler _publishHandler; 98 protected Handler _metaPublishHandler; 99 protected int _maxClientQueue=-1; 100 101 protected List<Extension> _extensions=new CopyOnWriteArrayList<Extension>(); 102 protected JSON.Literal _transports=new JSON.Literal("[\""+Bayeux.TRANSPORT_LONG_POLL+ "\",\""+Bayeux.TRANSPORT_CALLBACK_POLL+"\"]"); 103 protected List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>(); 104 protected List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>(); 105 106 /* ------------------------------------------------------------ */ 107 /** 108 * @param context. 109 * The logLevel init parameter is used to set the logging to 110 * 0=none, 1=info, 2=debug 111 */ AbstractBayeux()112 protected AbstractBayeux() 113 { 114 _publishHandler=new PublishHandler(); 115 _metaPublishHandler=new MetaPublishHandler(); 116 _handlers.put(META_HANDSHAKE,new HandshakeHandler()); 117 _handlers.put(META_CONNECT,new ConnectHandler()); 118 _handlers.put(META_DISCONNECT,new DisconnectHandler()); 119 _handlers.put(META_SUBSCRIBE,new SubscribeHandler()); 120 _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler()); 121 _handlers.put(META_PING,new PingHandler()); 122 123 setTimeout(getTimeout()); 124 } 125 126 /* ------------------------------------------------------------ */ addExtension(Extension ext)127 public void addExtension(Extension ext) 128 { 129 _extensions.add(ext); 130 } 131 132 /* ------------------------------------------------------------ */ getExtensions()133 public List<Extension> getExtensions() 134 { 135 // TODO - remove this hack of a method! 136 return _extensions; 137 } 138 139 /* ------------------------------------------------------------ */ removeExtension(Extension ext)140 public void removeExtension(Extension ext) 141 { 142 _extensions.remove(ext); 143 } 144 145 /* ------------------------------------------------------------ */ 146 /** 147 * @param id 148 * @return 149 */ getChannel(ChannelId id)150 public ChannelImpl getChannel(ChannelId id) 151 { 152 return _root.getChild(id); 153 } 154 155 /* ------------------------------------------------------------ */ getChannel(String id)156 public ChannelImpl getChannel(String id) 157 { 158 ChannelId cid=getChannelId(id); 159 if (cid.depth()==0) 160 return null; 161 return _root.getChild(cid); 162 } 163 164 /* ------------------------------------------------------------ */ getChannel(String id, boolean create)165 public Channel getChannel(String id, boolean create) 166 { 167 synchronized(this) 168 { 169 ChannelImpl channel=getChannel(id); 170 171 if (channel==null && create) 172 { 173 channel=new ChannelImpl(id,this); 174 _root.addChild(channel); 175 176 if (isLogInfo()) 177 logInfo("newChannel: "+channel); 178 } 179 return channel; 180 } 181 } 182 183 /* ------------------------------------------------------------ */ getChannelId(String id)184 public ChannelId getChannelId(String id) 185 { 186 ChannelId cid = _channelIdCache.get(id); 187 if (cid==null) 188 { 189 // TODO shrink cache! 190 cid=new ChannelId(id); 191 _channelIdCache.put(id,cid); 192 } 193 return cid; 194 } 195 196 /* ------------------------------------------------------------ */ 197 /* (non-Javadoc) 198 * @see org.mortbay.cometd.Bx#getClient(java.lang.String) 199 */ getClient(String client_id)200 public Client getClient(String client_id) 201 { 202 synchronized(this) 203 { 204 if (client_id==null) 205 return null; 206 Client client = _clients.get(client_id); 207 return client; 208 } 209 } 210 211 /* ------------------------------------------------------------ */ getClientIDs()212 public Set<String> getClientIDs() 213 { 214 return _clients.keySet(); 215 } 216 217 /* ------------------------------------------------------------ */ 218 /** 219 * @return The maximum time in ms to wait between polls before timing out a client 220 */ getMaxInterval()221 public long getMaxInterval() 222 { 223 return _maxInterval; 224 } 225 226 /* ------------------------------------------------------------ */ 227 /** 228 * @return the logLevel. 0=none, 1=info, 2=debug 229 */ getLogLevel()230 public int getLogLevel() 231 { 232 return _logLevel; 233 } 234 235 /* ------------------------------------------------------------ */ 236 /* (non-Javadoc) 237 * @see org.mortbay.cometd.Bx#getSecurityPolicy() 238 */ getSecurityPolicy()239 public SecurityPolicy getSecurityPolicy() 240 { 241 return _securityPolicy; 242 } 243 244 /* ------------------------------------------------------------ */ getTimeout()245 public long getTimeout() 246 { 247 return _timeout; 248 } 249 250 /* ------------------------------------------------------------ */ getInterval()251 public long getInterval() 252 { 253 return _interval; 254 } 255 256 /* ------------------------------------------------------------ */ 257 /** 258 * @return true if published messages are directly delivered to subscribers. False if 259 * a new message is to be created that holds only supported fields. 260 */ isDirectDeliver()261 public boolean isDirectDeliver() 262 { 263 return _directDeliver; 264 } 265 266 /* ------------------------------------------------------------ */ 267 /** 268 * @param directDeliver true if published messages are directly delivered to subscribers. False if 269 * a new message is to be created that holds only supported fields. 270 */ setDirectDeliver(boolean directDeliver)271 public void setDirectDeliver(boolean directDeliver) 272 { 273 _directDeliver = directDeliver; 274 } 275 276 /* ------------------------------------------------------------ */ 277 /** Handle a Bayeux message. 278 * This is normally only called by the bayeux servlet or a test harness. 279 * @param client The client if known 280 * @param transport The transport to use for the message 281 * @param message The bayeux message. 282 */ handle(ClientImpl client, Transport transport, Message message)283 public String handle(ClientImpl client, Transport transport, Message message) throws IOException 284 { 285 String channel_id=message.getChannel(); 286 287 Handler handler=(Handler)_handlers.get(channel_id); 288 if (handler!=null) 289 { 290 // known meta channel 291 ListIterator<Extension> iter = _extensions.listIterator(_extensions.size()); 292 while(iter.hasPrevious()) 293 message=iter.previous().rcvMeta(message); 294 295 handler.handle(client,transport,message); 296 _metaPublishHandler.handle(client,transport,message); 297 } 298 else if (channel_id.startsWith(META_SLASH)) 299 { 300 // unknown meta channel 301 ListIterator<Extension> iter = _extensions.listIterator(_extensions.size()); 302 while(iter.hasPrevious()) 303 message=iter.previous().rcvMeta(message); 304 _metaPublishHandler.handle(client,transport,message); 305 } 306 else 307 { 308 // non meta channel 309 handler=_publishHandler; 310 ListIterator<Extension> iter = _extensions.listIterator(_extensions.size()); 311 while(iter.hasPrevious()) 312 message=iter.previous().rcv(message); 313 handler.handle(client,transport,message); 314 } 315 316 return channel_id; 317 } 318 319 /* ------------------------------------------------------------ */ hasChannel(String id)320 public boolean hasChannel(String id) 321 { 322 ChannelId cid=getChannelId(id); 323 return _root.getChild(cid)!=null; 324 } 325 326 /* ------------------------------------------------------------ */ isInitialized()327 public boolean isInitialized() 328 { 329 return _initialized; 330 } 331 332 /* ------------------------------------------------------------ */ 333 /** 334 * @return the commented 335 */ isJSONCommented()336 public boolean isJSONCommented() 337 { 338 return _JSONCommented; 339 } 340 341 /* ------------------------------------------------------------ */ isLogDebug()342 public boolean isLogDebug() 343 { 344 return _logLevel>1; 345 } 346 347 /* ------------------------------------------------------------ */ isLogInfo()348 public boolean isLogInfo() 349 { 350 return _logLevel>0; 351 } 352 353 /* ------------------------------------------------------------ */ logDebug(String message)354 public void logDebug(String message) 355 { 356 if (_logLevel>1) 357 _context.log(message); 358 } 359 360 /* ------------------------------------------------------------ */ logDebug(String message, Throwable th)361 public void logDebug(String message, Throwable th) 362 { 363 if (_logLevel>1) 364 _context.log(message,th); 365 } 366 367 /* ------------------------------------------------------------ */ logWarn(String message, Throwable th)368 public void logWarn(String message, Throwable th) 369 { 370 _context.log(message+": "+th.toString()); 371 } 372 373 /* ------------------------------------------------------------ */ logWarn(String message)374 public void logWarn(String message) 375 { 376 _context.log(message); 377 } 378 379 /* ------------------------------------------------------------ */ logInfo(String message)380 public void logInfo(String message) 381 { 382 if (_logLevel>0) 383 _context.log(message); 384 } 385 386 /* ------------------------------------------------------------ */ newClient(String idPrefix)387 public Client newClient(String idPrefix) 388 { 389 ClientImpl client = new ClientImpl(this,idPrefix); 390 return client; 391 } 392 393 /* ------------------------------------------------------------ */ newRemoteClient()394 public abstract ClientImpl newRemoteClient(); 395 396 /* ------------------------------------------------------------ */ 397 /** Create new transport object for a bayeux message 398 * @param client The client 399 * @param message the bayeux message 400 * @return the negotiated transport. 401 */ newTransport(ClientImpl client, Map<?,?> message)402 public Transport newTransport(ClientImpl client, Map<?,?> message) 403 { 404 if (isLogDebug()) 405 logDebug("newTransport: client="+client+",message="+message); 406 407 Transport result=null; 408 409 try 410 { 411 String type=client==null?null:client.getConnectionType(); 412 if (type==null) 413 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD); 414 415 if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type) || type==null) 416 { 417 String jsonp=(String)message.get(Bayeux.JSONP_PARAMETER); 418 if(jsonp!=null) 419 result=new JSONPTransport(client!=null&&client.isJSONCommented(),jsonp); 420 else 421 result=new JSONTransport(client!=null&&client.isJSONCommented()); 422 } 423 else 424 result=new JSONTransport(client!=null&&client.isJSONCommented()); 425 426 } 427 catch (Exception e) 428 { 429 throw new RuntimeException(e); 430 } 431 432 if (isLogDebug()) 433 logDebug("newTransport: result="+result); 434 return result; 435 } 436 437 /* ------------------------------------------------------------ */ 438 /** Publish data to a channel. 439 * Creates a message and delivers it to the root channel. 440 * @param to 441 * @param from 442 * @param data 443 * @param msgId 444 */ doPublish(ChannelId to, Client from, Object data, String msgId)445 protected void doPublish(ChannelId to, Client from, Object data, String msgId) 446 { 447 Message msg = newMessage(); 448 msg.put(CHANNEL_FIELD,to.toString()); 449 450 if (msgId==null) 451 { 452 long id=msg.hashCode() 453 ^(to==null?0:to.hashCode()) 454 ^(from==null?0:from.hashCode()); 455 id=id<0?-id:id; 456 msg.put(ID_FIELD,Long.toString(id,36)); 457 } 458 else 459 msg.put(ID_FIELD,msgId); 460 461 msg.put(DATA_FIELD,data); 462 463 for (Extension e:_extensions) 464 msg=e.send(msg); 465 _root.doDelivery(to,from,msg); 466 ((MessageImpl)msg).decRef(); 467 } 468 469 /* ------------------------------------------------------------ */ 470 public boolean removeChannel(ChannelImpl channel) 471 { 472 boolean removed = _root.doRemove(channel); 473 if (removed) 474 for (ChannelBayeuxListener l : _channelListeners) 475 l.channelRemoved(channel); 476 return removed; 477 } 478 479 /* ------------------------------------------------------------ */ 480 public void addChannel(ChannelImpl channel) 481 { 482 for (ChannelBayeuxListener l : _channelListeners) 483 l.channelAdded(channel); 484 } 485 486 /* ------------------------------------------------------------ */ 487 protected String newClientId(long variation, String idPrefix) 488 { 489 if (idPrefix==null) 490 return Long.toString(getRandom(),36)+Long.toString(variation,36); 491 else 492 return idPrefix+"_"+Long.toString(getRandom(),36); 493 } 494 495 /* ------------------------------------------------------------ */ 496 protected void addClient(ClientImpl client,String idPrefix) 497 { 498 while(true) 499 { 500 String id = newClientId(client.hashCode(),idPrefix); 501 client.setId(id); 502 503 ClientImpl other = _clients.putIfAbsent(id,client); 504 if (other==null) 505 { 506 for (ClientBayeuxListener l : _clientListeners) 507 l.clientAdded((Client)client); 508 509 return; 510 } 511 } 512 } 513 514 /* ------------------------------------------------------------ */ 515 /* (non-Javadoc) 516 * @see org.mortbay.cometd.Bx#removeClient(java.lang.String) 517 */ 518 public Client removeClient(String client_id) 519 { 520 ClientImpl client; 521 synchronized(this) 522 { 523 if (client_id==null) 524 return null; 525 client = _clients.remove(client_id); 526 } 527 if (client!=null) 528 { 529 client.unsubscribeAll(); 530 for (ClientBayeuxListener l : _clientListeners) 531 l.clientRemoved((Client)client); 532 } 533 return client; 534 } 535 536 /* ------------------------------------------------------------ */ 537 /** 538 * @param ms The maximum time in ms to wait between polls before timing out a client 539 */ 540 public void setMaxInterval(long ms) 541 { 542 _maxInterval=ms; 543 } 544 545 /* ------------------------------------------------------------ */ 546 /** 547 * @param commented the commented to set 548 */ 549 public void setJSONCommented(boolean commented) 550 { 551 _JSONCommented=commented; 552 } 553 554 /* ------------------------------------------------------------ */ 555 /** 556 * @param logLevel 557 * the logLevel: 0=none, 1=info, 2=debug 558 */ 559 public void setLogLevel(int logLevel) 560 { 561 _logLevel=logLevel; 562 } 563 564 /* ------------------------------------------------------------ */ 565 /* (non-Javadoc) 566 * @see org.mortbay.cometd.Bx#setSecurityPolicy(org.mortbay.cometd.SecurityPolicy) 567 */ 568 public void setSecurityPolicy(SecurityPolicy securityPolicy) 569 { 570 _securityPolicy=securityPolicy; 571 } 572 573 574 /* ------------------------------------------------------------ */ 575 public void setTimeout(long ms) 576 { 577 _timeout = ms; 578 generateAdvice(); 579 } 580 581 582 /* ------------------------------------------------------------ */ 583 public void setInterval(long ms) 584 { 585 _interval = ms; 586 generateAdvice(); 587 } 588 589 /* ------------------------------------------------------------ */ 590 /** 591 * The time a client should delay between reconnects when multiple 592 * connections from the same browser are detected. This effectively 593 * produces traditional polling. 594 * @param multiFrameInterval the multiFrameInterval to set 595 */ 596 public void setMultiFrameInterval(int multiFrameInterval) 597 { 598 _multiFrameInterval=multiFrameInterval; 599 generateAdvice(); 600 } 601 602 /* ------------------------------------------------------------ */ 603 /** 604 * @return the multiFrameInterval in milliseconds 605 */ 606 public int getMultiFrameInterval() 607 { 608 return _multiFrameInterval; 609 } 610 611 /* ------------------------------------------------------------ */ 612 void generateAdvice() 613 { 614 setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":"+getInterval()+",\"timeout\":"+getTimeout()+"}")); 615 } 616 617 /* ------------------------------------------------------------ */ 618 public void setAdvice(JSON.Literal advice) 619 { 620 synchronized(this) 621 { 622 _adviceVersion++; 623 _advice=advice; 624 _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice))); 625 } 626 } 627 628 /* ------------------------------------------------------------ */ 629 private Map<String,Object> multiFrameAdvice(JSON.Literal advice) 630 { 631 Map<String,Object> a = (Map<String,Object>)JSON.parse(_advice.toString()); 632 a.put("multiple-clients",Boolean.TRUE); 633 if (_multiFrameInterval>0) 634 { 635 a.put("reconnect","retry"); 636 a.put("interval",_multiFrameInterval); 637 } 638 else 639 a.put("reconnect","none"); 640 return a; 641 } 642 643 644 645 /* ------------------------------------------------------------ */ getAdvice()646 public JSON.Literal getAdvice() 647 { 648 return _advice; 649 } 650 651 /* ------------------------------------------------------------ */ 652 /** 653 * @return TRUE if {@link #getCurrentRequest()} will return the current request 654 */ isRequestAvailable()655 public boolean isRequestAvailable() 656 { 657 return _requestAvailable; 658 } 659 660 /* ------------------------------------------------------------ */ 661 /** 662 * @param requestAvailable TRUE if {@link #getCurrentRequest()} will return the current request 663 */ setRequestAvailable(boolean requestAvailable)664 public void setRequestAvailable(boolean requestAvailable) 665 { 666 _requestAvailable=requestAvailable; 667 } 668 669 /* ------------------------------------------------------------ */ 670 /** 671 * @return the current request if {@link #isRequestAvailable()} is true, else null 672 */ getCurrentRequest()673 public HttpServletRequest getCurrentRequest() 674 { 675 return _request.get(); 676 } 677 678 /* ------------------------------------------------------------ */ 679 /** 680 * @return the current request if {@link #isRequestAvailable()} is true, else null 681 */ setCurrentRequest(HttpServletRequest request)682 void setCurrentRequest(HttpServletRequest request) 683 { 684 _request.set(request); 685 } 686 687 688 689 /* ------------------------------------------------------------ */ getChannels()690 public Collection<Channel> getChannels() 691 { 692 List<Channel> channels = new ArrayList<Channel>(); 693 _root.getChannels(channels); 694 return channels; 695 } 696 697 /* ------------------------------------------------------------ */ 698 /** 699 * @return 700 */ getChannelCount()701 public int getChannelCount() 702 { 703 return _root.getChannelCount(); 704 } 705 706 /* ------------------------------------------------------------ */ getClients()707 public Collection<Client> getClients() 708 { 709 synchronized(this) 710 { 711 return new ArrayList<Client>(_clients.values()); 712 } 713 } 714 715 /* ------------------------------------------------------------ */ 716 /** 717 * @return 718 */ getClientCount()719 public int getClientCount() 720 { 721 synchronized(this) 722 { 723 return _clients.size(); 724 } 725 } 726 727 /* ------------------------------------------------------------ */ hasClient(String clientId)728 public boolean hasClient(String clientId) 729 { 730 synchronized(this) 731 { 732 if (clientId==null) 733 return false; 734 return _clients.containsKey(clientId); 735 } 736 } 737 738 /* ------------------------------------------------------------ */ removeChannel(String channelId)739 public Channel removeChannel(String channelId) 740 { 741 Channel channel = getChannel(channelId); 742 743 boolean removed = false; 744 if (channel!=null) 745 removed = channel.remove(); 746 747 if (removed) 748 return channel; 749 else 750 return null; 751 } 752 753 /* ------------------------------------------------------------ */ initialize(ServletContext context)754 protected void initialize(ServletContext context) 755 { 756 synchronized(this) 757 { 758 _initialized=true; 759 _context=context; 760 try 761 { 762 _random=SecureRandom.getInstance("SHA1PRNG"); 763 } 764 catch (Exception e) 765 { 766 context.log("Could not get secure random for ID generation",e); 767 _random=new Random(); 768 } 769 _random.setSeed(_random.nextLong()^hashCode()^(context.hashCode()<<32)^Runtime.getRuntime().freeMemory()); 770 _channelIdCache=new ConcurrentHashMap<String, ChannelId>(); 771 772 _root.addChild(new ServiceChannel(Bayeux.SERVICE)); 773 774 } 775 } 776 777 /* ------------------------------------------------------------ */ getRandom()778 long getRandom() 779 { 780 long l=_random.nextLong(); 781 return l<0?-l:l; 782 } 783 784 /* ------------------------------------------------------------ */ clientOnBrowser(String browserId,String clientId)785 void clientOnBrowser(String browserId,String clientId) 786 { 787 List<String> clients=_browser2client.get(browserId); 788 if (clients==null) 789 { 790 List<String> new_clients=new CopyOnWriteArrayList<String>(); 791 clients=_browser2client.putIfAbsent(browserId,new_clients); 792 if (clients==null) 793 clients=new_clients; 794 } 795 clients.add(clientId); 796 } 797 798 /* ------------------------------------------------------------ */ clientOffBrowser(String browserId,String clientId)799 void clientOffBrowser(String browserId,String clientId) 800 { 801 List<String> clients=_browser2client.get(browserId); 802 if (clients!=null) 803 clients.remove(clientId); 804 } 805 806 /* ------------------------------------------------------------ */ clientsOnBrowser(String browserId)807 List<String> clientsOnBrowser(String browserId) 808 { 809 List<String> clients=_browser2client.get(browserId); 810 if (clients==null) 811 return Collections.emptyList(); 812 return clients; 813 } 814 815 /* ------------------------------------------------------------ */ addListener(BayeuxListener listener)816 public void addListener(BayeuxListener listener) 817 { 818 if (listener instanceof ClientBayeuxListener) 819 _clientListeners.add((ClientBayeuxListener)listener); 820 else if(listener instanceof ChannelBayeuxListener) 821 _channelListeners.add((ChannelBayeuxListener)listener); 822 } 823 824 /* ------------------------------------------------------------ */ getMaxClientQueue()825 public int getMaxClientQueue() 826 { 827 return _maxClientQueue; 828 } 829 830 /* ------------------------------------------------------------ */ setMaxClientQueue(int size)831 public void setMaxClientQueue(int size) 832 { 833 _maxClientQueue=size; 834 } 835 836 837 838 /* ------------------------------------------------------------ */ 839 /* ------------------------------------------------------------ */ 840 public static class DefaultPolicy implements SecurityPolicy 841 { canHandshake(Message message)842 public boolean canHandshake(Message message) 843 { 844 return true; 845 } 846 canCreate(Client client, String channel, Message message)847 public boolean canCreate(Client client, String channel, Message message) 848 { 849 return client!=null && !channel.startsWith(Bayeux.META_SLASH); 850 } 851 canSubscribe(Client client, String channel, Message message)852 public boolean canSubscribe(Client client, String channel, Message message) 853 { 854 if (client!=null && ("/**".equals(channel) || "/*".equals(channel))) 855 return false; 856 return client!=null && !channel.startsWith(Bayeux.META_SLASH); 857 } 858 canPublish(Client client, String channel, Message message)859 public boolean canPublish(Client client, String channel, Message message) 860 { 861 return client!=null || client==null && Bayeux.META_HANDSHAKE.equals(channel); 862 } 863 864 } 865 866 867 /* ------------------------------------------------------------ */ 868 /* ------------------------------------------------------------ */ 869 protected abstract class Handler 870 { 871 abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException; 872 abstract ChannelId getMetaChannelId(); unknownClient(Transport transport,String channel)873 void unknownClient(Transport transport,String channel) throws IOException 874 { 875 MessageImpl reply=newMessage(); 876 877 reply.put(CHANNEL_FIELD,channel); 878 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE); 879 reply.put(ERROR_FIELD,"402::Unknown client"); 880 reply.put("advice",_handshakeAdvice); 881 transport.send(reply); 882 } 883 } 884 885 /* ------------------------------------------------------------ */ 886 /* ------------------------------------------------------------ */ 887 protected class ConnectHandler extends Handler 888 { 889 protected String _metaChannel=META_CONNECT; 890 891 @Override getMetaChannelId()892 ChannelId getMetaChannelId() 893 { 894 return META_CONNECT_ID; 895 } 896 897 @Override handle(ClientImpl client, Transport transport, Message message)898 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 899 { 900 if (client==null) 901 { 902 unknownClient(transport,_metaChannel); 903 return; 904 } 905 906 // is this the first connect message? 907 String type=client.getConnectionType(); 908 boolean polling=true; 909 if (type==null) 910 { 911 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD); 912 client.setConnectionType(type); 913 polling=false; 914 } 915 916 Object advice = message.get(ADVICE_FIELD); 917 if (advice!=null) 918 { 919 Long timeout=(Long)((Map)advice).get("timeout"); 920 if (timeout!=null && timeout.longValue()>0) 921 client.setTimeout(timeout.longValue()); 922 else 923 client.setTimeout(0); 924 } 925 else 926 client.setTimeout(0); 927 928 advice=null; 929 930 // Work out if multiple clients from some browser? 931 if (polling && _multiFrameInterval>0 && client.getBrowserId()!=null) 932 { 933 List<String> clients=clientsOnBrowser(client.getBrowserId()); 934 int count=clients.size(); 935 if (count>1) 936 { 937 polling=clients.get(0).equals(client.getId()); 938 advice=client.getAdvice(); 939 if (advice==null) 940 advice=_multiFrameAdvice; 941 else // could probably cache this 942 advice=multiFrameAdvice((JSON.Literal)advice); 943 } 944 } 945 946 synchronized(this) 947 { 948 if (advice==null) 949 { 950 if (_adviceVersion!=client._adviseVersion) 951 { 952 advice=_advice; 953 client._adviseVersion=_adviceVersion; 954 } 955 } 956 else 957 client._adviseVersion=-1; // clear so it is reset after multi state clears 958 } 959 960 // reply to connect message 961 String id=message.getId(); 962 963 Message reply=newMessage(message); 964 965 reply.put(CHANNEL_FIELD,META_CONNECT); 966 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE); 967 if (advice!=null) 968 reply.put(ADVICE_FIELD,advice); 969 if (id!=null) 970 reply.put(ID_FIELD,id); 971 972 if (polling) 973 transport.setPollReply(reply); 974 else 975 { 976 for (Extension e:_extensions) 977 reply=e.sendMeta(reply); 978 transport.send(reply); 979 } 980 } 981 } 982 983 /* ------------------------------------------------------------ */ 984 /* ------------------------------------------------------------ */ 985 protected class DisconnectHandler extends Handler 986 { 987 @Override getMetaChannelId()988 ChannelId getMetaChannelId() 989 { 990 return META_DISCONNECT_ID; 991 } 992 993 @Override handle(ClientImpl client, Transport transport, Message message)994 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 995 { 996 if (client==null) 997 { 998 unknownClient(transport,META_DISCONNECT); 999 return; 1000 } 1001 if (isLogInfo()) 1002 logInfo("Disconnect "+client.getId()); 1003 1004 client.remove(false); 1005 1006 Message reply=newMessage(message); 1007 reply.put(CHANNEL_FIELD,META_DISCONNECT); 1008 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE); 1009 String id=message.getId(); 1010 if (id!=null) 1011 reply.put(ID_FIELD,id); 1012 1013 for (Extension e:_extensions) 1014 reply=e.sendMeta(reply); 1015 1016 Message pollReply = transport.getPollReply(); 1017 if (pollReply!=null) 1018 { 1019 for (Extension e:_extensions) 1020 pollReply=e.sendMeta(pollReply); 1021 transport.send(pollReply); 1022 transport.setPollReply(null); 1023 } 1024 transport.send(reply); 1025 } 1026 } 1027 1028 1029 /* ------------------------------------------------------------ */ 1030 /* ------------------------------------------------------------ */ 1031 protected class HandshakeHandler extends Handler 1032 { 1033 @Override getMetaChannelId()1034 ChannelId getMetaChannelId() 1035 { 1036 return META_HANDSHAKE_ID; 1037 } 1038 1039 @Override handle(ClientImpl client, Transport transport, Message message)1040 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 1041 { 1042 if (client!=null) 1043 throw new IllegalStateException(); 1044 1045 if (_securityPolicy!=null && !_securityPolicy.canHandshake(message)) 1046 { 1047 Message reply=newMessage(message); 1048 reply.put(CHANNEL_FIELD,META_HANDSHAKE); 1049 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE); 1050 reply.put(ERROR_FIELD,"403::Handshake denied"); 1051 1052 for (Extension e:_extensions) 1053 reply=e.sendMeta(reply); 1054 1055 transport.send(reply); 1056 return; 1057 } 1058 1059 client=newRemoteClient(); 1060 1061 Map<?,?> ext = (Map<?,?>)message.get(EXT_FIELD); 1062 1063 boolean commented=_JSONCommented && ext!=null && Boolean.TRUE.equals(ext.get("json-comment-filtered")); 1064 1065 Message reply=newMessage(message); 1066 reply.put(CHANNEL_FIELD,META_HANDSHAKE); 1067 reply.put("version","1.0"); 1068 reply.put("minimumVersion","0.9"); 1069 if (isJSONCommented()) 1070 reply.put(EXT_FIELD,EXT_JSON_COMMENTED); 1071 1072 if (client!=null) 1073 { 1074 reply.put("supportedConnectionTypes",_transports); 1075 reply.put("successful",Boolean.TRUE); 1076 reply.put(CLIENT_FIELD,client.getId()); 1077 if (_advice!=null) 1078 reply.put(ADVICE_FIELD,_advice); 1079 client.setJSONCommented(commented); 1080 transport.setJSONCommented(commented); 1081 } 1082 else 1083 { 1084 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); 1085 if (_advice!=null) 1086 reply.put(ADVICE_FIELD,_advice); 1087 } 1088 1089 if (isLogDebug()) 1090 logDebug("handshake.handle: reply="+reply); 1091 1092 String id=message.getId(); 1093 if (id!=null) 1094 reply.put(ID_FIELD,id); 1095 1096 for (Extension e:_extensions) 1097 reply=e.sendMeta(reply); 1098 transport.send(reply); 1099 } 1100 } 1101 1102 /* ------------------------------------------------------------ */ 1103 /* ------------------------------------------------------------ */ 1104 protected class PublishHandler extends Handler 1105 { 1106 @Override getMetaChannelId()1107 ChannelId getMetaChannelId() 1108 { 1109 return null; 1110 } 1111 1112 @Override handle(ClientImpl client, Transport transport, Message message)1113 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 1114 { 1115 String channel_id=message.getChannel(); 1116 1117 if (client==null && message.containsKey(CLIENT_FIELD)) 1118 { 1119 unknownClient(transport,channel_id); 1120 return; 1121 } 1122 1123 String id=message.getId(); 1124 1125 ChannelId cid=getChannelId(channel_id); 1126 Object data=message.get(Bayeux.DATA_FIELD); 1127 1128 Message reply=newMessage(message); 1129 reply.put(CHANNEL_FIELD,channel_id); 1130 if (id!=null) 1131 reply.put(ID_FIELD,id); 1132 1133 if (data!=null&&_securityPolicy.canPublish(client,channel_id,message)) 1134 { 1135 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE); 1136 1137 for (Extension e:_extensions) 1138 reply=e.sendMeta(reply); 1139 1140 transport.send(reply); 1141 if (_directDeliver) 1142 { 1143 message.remove(CLIENT_FIELD); 1144 for (Extension e:_extensions) 1145 message=e.send(message); 1146 _root.doDelivery(cid,client,message); 1147 } 1148 else 1149 doPublish(cid,client,data,id==null?null:id); 1150 } 1151 else 1152 { 1153 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE); 1154 reply.put(ERROR_FIELD,"403::Publish denied"); 1155 1156 for (Extension e:_extensions) 1157 reply=e.sendMeta(reply); 1158 transport.send(reply); 1159 } 1160 } 1161 } 1162 1163 /* ------------------------------------------------------------ */ 1164 /* ------------------------------------------------------------ */ 1165 protected class MetaPublishHandler extends Handler 1166 { 1167 @Override getMetaChannelId()1168 ChannelId getMetaChannelId() 1169 { 1170 return null; 1171 } 1172 1173 @Override handle(ClientImpl client, Transport transport, Message message)1174 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 1175 { 1176 String channel_id=message.getChannel(); 1177 1178 if (client==null && !META_HANDSHAKE.equals(channel_id)) 1179 { 1180 // unknown client 1181 return; 1182 } 1183 1184 if(_securityPolicy.canPublish(client,channel_id,message)) 1185 { 1186 _root.doDelivery(getChannelId(channel_id),client,message); 1187 } 1188 } 1189 } 1190 1191 /* ------------------------------------------------------------ */ 1192 /* ------------------------------------------------------------ */ 1193 protected class SubscribeHandler extends Handler 1194 { 1195 @Override getMetaChannelId()1196 ChannelId getMetaChannelId() 1197 { 1198 return META_SUBSCRIBE_ID; 1199 } 1200 1201 @Override handle(ClientImpl client, Transport transport, Message message)1202 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 1203 { 1204 if (client==null) 1205 { 1206 unknownClient(transport,META_SUBSCRIBE); 1207 return; 1208 } 1209 1210 String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD); 1211 1212 // select a random channel ID if none specifified 1213 if (subscribe_id==null) 1214 { 1215 subscribe_id=Long.toString(getRandom(),36); 1216 while (getChannel(subscribe_id)!=null) 1217 subscribe_id=Long.toString(getRandom(),36); 1218 } 1219 1220 ChannelId cid=null; 1221 boolean can_subscribe=false; 1222 1223 if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH)) 1224 { 1225 can_subscribe=true; 1226 } 1227 else if (subscribe_id.startsWith(Bayeux.META_SLASH)) 1228 { 1229 can_subscribe=false; 1230 } 1231 else 1232 { 1233 cid=getChannelId(subscribe_id); 1234 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message); 1235 } 1236 1237 Message reply=newMessage(message); 1238 reply.put(CHANNEL_FIELD,META_SUBSCRIBE); 1239 reply.put(SUBSCRIPTION_FIELD,subscribe_id); 1240 1241 if (can_subscribe) 1242 { 1243 if (cid!=null) 1244 { 1245 ChannelImpl channel=getChannel(cid); 1246 if (channel==null&&_securityPolicy.canCreate(client,subscribe_id,message)) 1247 channel=(ChannelImpl)getChannel(subscribe_id, true); 1248 1249 if (channel!=null) 1250 channel.subscribe(client); 1251 else 1252 can_subscribe=false; 1253 } 1254 1255 if (can_subscribe) 1256 { 1257 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE); 1258 } 1259 else 1260 { 1261 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE); 1262 reply.put(ERROR_FIELD,"403::cannot create"); 1263 } 1264 } 1265 else 1266 { 1267 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE); 1268 reply.put(ERROR_FIELD,"403::cannot subscribe"); 1269 1270 } 1271 1272 String id=message.getId(); 1273 if (id!=null) 1274 reply.put(ID_FIELD,id); 1275 for (Extension e:_extensions) 1276 reply=e.sendMeta(reply); 1277 transport.send(reply); 1278 } 1279 } 1280 1281 /* ------------------------------------------------------------ */ 1282 /* ------------------------------------------------------------ */ 1283 protected class UnsubscribeHandler extends Handler 1284 { 1285 @Override getMetaChannelId()1286 ChannelId getMetaChannelId() 1287 { 1288 return META_UNSUBSCRIBE_ID; 1289 } 1290 1291 @Override handle(ClientImpl client, Transport transport, Message message)1292 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 1293 { 1294 if (client==null) 1295 { 1296 unknownClient(transport,META_UNSUBSCRIBE); 1297 return; 1298 } 1299 1300 String channel_id=(String)message.get(SUBSCRIPTION_FIELD); 1301 ChannelImpl channel=getChannel(channel_id); 1302 if (channel!=null) 1303 channel.unsubscribe(client); 1304 1305 Message reply=newMessage(message); 1306 reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE); 1307 if (channel!=null) 1308 { 1309 channel.unsubscribe(client); 1310 reply.put(SUBSCRIPTION_FIELD,channel.getId()); 1311 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE); 1312 } 1313 else 1314 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE); 1315 1316 String id=message.getId(); 1317 if (id!=null) 1318 reply.put(ID_FIELD,id); 1319 for (Extension e:_extensions) 1320 reply=e.sendMeta(reply); 1321 transport.send(reply); 1322 } 1323 } 1324 1325 /* ------------------------------------------------------------ */ 1326 /* ------------------------------------------------------------ */ 1327 protected class PingHandler extends Handler 1328 { 1329 @Override getMetaChannelId()1330 ChannelId getMetaChannelId() 1331 { 1332 return META_PING_ID; 1333 } 1334 1335 @Override handle(ClientImpl client, Transport transport, Message message)1336 public void handle(ClientImpl client, Transport transport, Message message) throws IOException 1337 { 1338 Message reply=newMessage(message); 1339 reply.put(CHANNEL_FIELD,META_PING); 1340 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE); 1341 1342 String id=message.getId(); 1343 if (id!=null) 1344 reply.put(ID_FIELD,id); 1345 for (Extension e:_extensions) 1346 reply=e.sendMeta(reply); 1347 transport.send(reply); 1348 } 1349 } 1350 1351 1352 /* ------------------------------------------------------------ */ 1353 /* ------------------------------------------------------------ */ 1354 protected class ServiceChannel extends ChannelImpl 1355 { ServiceChannel(String id)1356 ServiceChannel(String id) 1357 { 1358 super(id,AbstractBayeux.this); 1359 } 1360 1361 /* ------------------------------------------------------------ */ 1362 /* (non-Javadoc) 1363 * @see org.mortbay.cometd.ChannelImpl#addChild(org.mortbay.cometd.ChannelImpl) 1364 */ 1365 @Override addChild(ChannelImpl channel)1366 public void addChild(ChannelImpl channel) 1367 { 1368 super.addChild(channel); 1369 setPersistent(true); 1370 } 1371 1372 /* ------------------------------------------------------------ */ 1373 @Override subscribe(Client client)1374 public void subscribe(Client client) 1375 { 1376 if (client.isLocal()) 1377 super.subscribe(client); 1378 } 1379 1380 } 1381 } 1382