1 // ======================================================================== 2 // Copyright 2006 Mort Bay Consulting Pty. Ltd. 3 // ------------------------------------------------------------------------ 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 //======================================================================== 14 15 package org.mortbay.cometd; 16 17 import java.util.ArrayList; 18 import java.util.Collections; 19 import java.util.List; 20 import java.util.Queue; 21 22 import org.cometd.Bayeux; 23 import org.cometd.Client; 24 import org.cometd.DeliverListener; 25 import org.cometd.Extension; 26 import org.cometd.ClientListener; 27 import org.cometd.Message; 28 import org.cometd.MessageListener; 29 import org.cometd.QueueListener; 30 import org.cometd.RemoveListener; 31 import org.mortbay.util.ArrayQueue; 32 import org.mortbay.util.LazyList; 33 import org.mortbay.util.ajax.JSON; 34 35 36 37 /* ------------------------------------------------------------ */ 38 /** 39 * 40 * @author gregw 41 */ 42 public class ClientImpl implements Client 43 { 44 private String _id; 45 private String _type; 46 private int _responsesPending; 47 private ChannelImpl[] _subscriptions=new ChannelImpl[0]; // copy on write 48 private boolean _JSONCommented; 49 private RemoveListener[] _rListeners; // copy on write 50 private MessageListener[] _syncMListeners; // copy on write 51 private MessageListener[] _asyncMListeners; // copy on write 52 private QueueListener[] _qListeners; // copy on write 53 private DeliverListener[] _dListeners; // copy on write 54 protected AbstractBayeux _bayeux; 55 private String _browserId; 56 private JSON.Literal _advice; 57 private int _batch; 58 private int _maxQueue; 59 private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this); 60 private long _timeout; 61 62 // manipulated and synchronized by AbstractBayeux 63 int _adviseVersion; 64 65 /* ------------------------------------------------------------ */ ClientImpl(AbstractBayeux bayeux)66 protected ClientImpl(AbstractBayeux bayeux) 67 { 68 _bayeux=bayeux; 69 _maxQueue=bayeux.getMaxClientQueue(); 70 _bayeux.addClient(this,null); 71 if (_bayeux.isLogInfo()) 72 _bayeux.logInfo("newClient: "+this); 73 } 74 75 /* ------------------------------------------------------------ */ ClientImpl(AbstractBayeux bayeux, String idPrefix)76 protected ClientImpl(AbstractBayeux bayeux, String idPrefix) 77 { 78 _bayeux=bayeux; 79 _maxQueue=0; 80 81 _bayeux.addClient(this,idPrefix); 82 83 if (_bayeux.isLogInfo()) 84 _bayeux.logInfo("newClient: "+this); 85 86 } 87 88 /* ------------------------------------------------------------ */ deliver(Client from, String toChannel, Object data, String id)89 public void deliver(Client from, String toChannel, Object data, String id) 90 { 91 // TODO recycle maps 92 Message message=_bayeux.newMessage(); 93 message.put(Bayeux.CHANNEL_FIELD,toChannel); 94 message.put(Bayeux.DATA_FIELD,data); 95 if (id!=null) 96 message.put(Bayeux.ID_FIELD,id); 97 98 for (Extension e:_bayeux._extensions) 99 message=e.send(message); 100 doDelivery(from,message); 101 102 ((MessageImpl)message).decRef(); 103 } 104 105 /* ------------------------------------------------------------ */ doDelivery(Client from, Message message)106 protected void doDelivery(Client from, Message message) 107 { 108 MessageListener[] alisteners=null; 109 synchronized(this) 110 { 111 ((MessageImpl)message).incRef(); 112 113 if (_maxQueue<0) 114 { 115 _queue.addUnsafe(message); 116 } 117 else 118 { 119 boolean add=_maxQueue>0; 120 if (_queue.size()>=_maxQueue && _qListeners!=null) 121 { 122 for (QueueListener l : _qListeners) 123 { 124 add &= l.queueMaxed((Client)this,message); 125 } 126 } 127 128 if (add) 129 _queue.addUnsafe(message); 130 } 131 132 // deliver unsynchronized 133 if (_syncMListeners!=null) 134 for (MessageListener l:_syncMListeners) 135 l.deliver(from,this,message); 136 alisteners=_asyncMListeners; 137 138 if (_batch==0 && _responsesPending<1 && _queue.size()>0) 139 resume(); 140 } 141 142 // deliver unsynchronized 143 if (alisteners!=null) 144 for (MessageListener l:alisteners) 145 l.deliver(from,this,message); 146 } 147 148 /* ------------------------------------------------------------ */ doDeliverListeners()149 public void doDeliverListeners() 150 { 151 synchronized (this) 152 { 153 if (_dListeners!=null) 154 for (DeliverListener l:_dListeners) 155 l.deliver(this,_queue); 156 } 157 } 158 159 160 /* ------------------------------------------------------------ */ startBatch()161 public void startBatch() 162 { 163 synchronized(this) 164 { 165 _batch++; 166 } 167 } 168 169 /* ------------------------------------------------------------ */ endBatch()170 public void endBatch() 171 { 172 synchronized(this) 173 { 174 if (--_batch==0 && _queue.size()>0 && _responsesPending<1) 175 resume(); 176 } 177 } 178 179 /* ------------------------------------------------------------ */ getConnectionType()180 public String getConnectionType() 181 { 182 return _type; 183 } 184 185 /* ------------------------------------------------------------ */ 186 /* (non-Javadoc) 187 * @see org.mortbay.cometd.C#getId() 188 */ getId()189 public String getId() 190 { 191 return _id; 192 } 193 194 /* ------------------------------------------------------------ */ hasMessages()195 public boolean hasMessages() 196 { 197 return _queue.size()>0; 198 } 199 200 /* ------------------------------------------------------------ */ 201 /** 202 * @return the commented 203 */ isJSONCommented()204 public boolean isJSONCommented() 205 { 206 synchronized(this) 207 { 208 return _JSONCommented; 209 } 210 } 211 212 /* ------------------------------------------------------------ */ isLocal()213 public boolean isLocal() 214 { 215 return true; 216 } 217 218 /* ------------------------------------------------------------ */ 219 /* ------------------------------------------------------------ */ 220 /* (non-Javadoc) 221 * @see dojox.cometd.Client#remove(boolean) 222 */ remove(boolean timeout)223 public void remove(boolean timeout) 224 { 225 synchronized(this) 226 { 227 Client client=_bayeux.removeClient(_id); 228 if (_bayeux.isLogInfo()) 229 _bayeux.logInfo("Remove client "+client+" timeout="+timeout); 230 if (_browserId!=null) 231 _bayeux.clientOffBrowser(getBrowserId(),_id); 232 _browserId=null; 233 234 if (_rListeners!=null) 235 for (RemoveListener l:_rListeners) 236 l.removed(_id, timeout); 237 } 238 resume(); 239 } 240 241 /* ------------------------------------------------------------ */ responded()242 public int responded() 243 { 244 synchronized(this) 245 { 246 return _responsesPending--; 247 } 248 } 249 250 /* ------------------------------------------------------------ */ responsePending()251 public int responsePending() 252 { 253 synchronized(this) 254 { 255 return ++_responsesPending; 256 } 257 } 258 259 /* ------------------------------------------------------------ */ 260 /** Called by deliver to resume anything waiting on this client. 261 */ resume()262 public void resume() 263 { 264 } 265 266 /* ------------------------------------------------------------ */ 267 /** 268 * @param commented the commented to set 269 */ setJSONCommented(boolean commented)270 public void setJSONCommented(boolean commented) 271 { 272 synchronized(this) 273 { 274 _JSONCommented=commented; 275 } 276 } 277 278 /* ------------------------------------------------------------ */ 279 /* 280 * @return the number of messages queued 281 */ getMessages()282 public int getMessages() 283 { 284 return _queue.size(); 285 } 286 287 /* ------------------------------------------------------------ */ takeMessages()288 public List<Message> takeMessages() 289 { 290 synchronized(this) 291 { 292 ArrayList<Message> list = new ArrayList<Message>(_queue); 293 _queue.clear(); 294 return list; 295 } 296 } 297 298 299 /* ------------------------------------------------------------ */ returnMessages(List<Message> messages)300 public void returnMessages(List<Message> messages) 301 { 302 synchronized(this) 303 { 304 _queue.addAll(0,messages); 305 } 306 } 307 308 /* ------------------------------------------------------------ */ 309 @Override toString()310 public String toString() 311 { 312 return _id; 313 } 314 315 /* ------------------------------------------------------------ */ addSubscription(ChannelImpl channel)316 protected void addSubscription(ChannelImpl channel) 317 { 318 synchronized (this) 319 { 320 _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null); 321 } 322 } 323 324 /* ------------------------------------------------------------ */ removeSubscription(ChannelImpl channel)325 protected void removeSubscription(ChannelImpl channel) 326 { 327 synchronized (this) 328 { 329 _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel); 330 } 331 } 332 333 /* ------------------------------------------------------------ */ setConnectionType(String type)334 protected void setConnectionType(String type) 335 { 336 synchronized (this) 337 { 338 _type=type; 339 } 340 } 341 342 /* ------------------------------------------------------------ */ setId(String _id)343 protected void setId(String _id) 344 { 345 synchronized (this) 346 { 347 this._id=_id; 348 } 349 } 350 351 /* ------------------------------------------------------------ */ unsubscribeAll()352 protected void unsubscribeAll() 353 { 354 ChannelImpl[] subscriptions; 355 synchronized(this) 356 { 357 _queue.clear(); 358 subscriptions=_subscriptions; 359 _subscriptions=new ChannelImpl[0]; 360 } 361 for (ChannelImpl channel : subscriptions) 362 channel.unsubscribe(this); 363 364 } 365 366 /* ------------------------------------------------------------ */ setBrowserId(String id)367 public void setBrowserId(String id) 368 { 369 if (_browserId!=null && !_browserId.equals(id)) 370 _bayeux.clientOffBrowser(_browserId,_id); 371 _browserId=id; 372 if (_browserId!=null) 373 _bayeux.clientOnBrowser(_browserId,_id); 374 } 375 376 /* ------------------------------------------------------------ */ getBrowserId()377 public String getBrowserId() 378 { 379 return _browserId; 380 } 381 382 /* ------------------------------------------------------------ */ 383 @Override equals(Object o)384 public boolean equals(Object o) 385 { 386 if (!(o instanceof Client)) 387 return false; 388 return getId().equals(((Client)o).getId()); 389 } 390 391 /* ------------------------------------------------------------ */ 392 /** 393 * Get the advice specific for this Client 394 * @return advice specific for this client or null 395 */ getAdvice()396 public JSON.Literal getAdvice() 397 { 398 return _advice; 399 } 400 401 /* ------------------------------------------------------------ */ 402 /** 403 * @param advice specific for this client 404 */ setAdvice(JSON.Literal advice)405 public void setAdvice(JSON.Literal advice) 406 { 407 _advice=advice; 408 } 409 410 411 /* ------------------------------------------------------------ */ addListener(ClientListener listener)412 public void addListener(ClientListener listener) 413 { 414 synchronized(this) 415 { 416 if (listener instanceof MessageListener) 417 { 418 if (listener instanceof MessageListener.Synchronous) 419 _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class); 420 else 421 _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class); 422 } 423 424 if (listener instanceof RemoveListener) 425 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class); 426 427 if (listener instanceof QueueListener) 428 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class); 429 430 if (listener instanceof DeliverListener) 431 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class); 432 } 433 } 434 435 /* ------------------------------------------------------------ */ removeListener(ClientListener listener)436 public void removeListener(ClientListener listener) 437 { 438 synchronized(this) 439 { 440 if (listener instanceof MessageListener) 441 { 442 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener); 443 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener); 444 } 445 446 if (listener instanceof RemoveListener) 447 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener); 448 449 if (listener instanceof QueueListener) 450 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener); 451 } 452 } 453 454 /* ------------------------------------------------------------ */ getTimeout()455 public long getTimeout() 456 { 457 return _timeout; 458 } 459 460 /* ------------------------------------------------------------ */ setTimeout(long timeoutMS)461 public void setTimeout(long timeoutMS) 462 { 463 _timeout=timeoutMS; 464 } 465 466 /* ------------------------------------------------------------ */ setMaxQueue(int maxQueue)467 public void setMaxQueue(int maxQueue) 468 { 469 _maxQueue=maxQueue; 470 } 471 472 /* ------------------------------------------------------------ */ getMaxQueue()473 public int getMaxQueue() 474 { 475 return _maxQueue; 476 } 477 478 /* ------------------------------------------------------------ */ getQueue()479 public Queue<Message> getQueue() 480 { 481 return _queue; 482 } 483 } 484