1 //======================================================================== 2 //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd. 3 //------------------------------------------------------------------------ 4 //Licensed under the Apache License, Version 2.0 (the "License"); 5 //you may not use this file except in compliance with the License. 6 //You may obtain a copy of the License at 7 //http://www.apache.org/licenses/LICENSE-2.0 8 //Unless required by applicable law or agreed to in writing, software 9 //distributed under the License is distributed on an "AS IS" BASIS, 10 //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 //See the License for the specific language governing permissions and 12 //limitations under the License. 13 //======================================================================== 14 15 package org.mortbay.io.nio; 16 17 import java.io.IOException; 18 import java.nio.channels.CancelledKeyException; 19 import java.nio.channels.SelectionKey; 20 import java.nio.channels.Selector; 21 import java.nio.channels.ServerSocketChannel; 22 import java.nio.channels.SocketChannel; 23 import java.util.ArrayList; 24 import java.util.Iterator; 25 import java.util.List; 26 27 import org.mortbay.component.AbstractLifeCycle; 28 import org.mortbay.component.LifeCycle; 29 import org.mortbay.io.Connection; 30 import org.mortbay.io.EndPoint; 31 import org.mortbay.log.Log; 32 import org.mortbay.thread.Timeout; 33 34 35 /* ------------------------------------------------------------ */ 36 /** 37 * The Selector Manager manages and number of SelectSets to allow 38 * NIO scheduling to scale to large numbers of connections. 39 * 40 * @author gregw 41 * 42 */ 43 public abstract class SelectorManager extends AbstractLifeCycle 44 { 45 private boolean _delaySelectKeyUpdate=true; 46 private long _maxIdleTime; 47 private long _lowResourcesConnections; 48 private long _lowResourcesMaxIdleTime; 49 private transient SelectSet[] _selectSet; 50 private int _selectSets=1; 51 private volatile int _set; 52 53 54 /* ------------------------------------------------------------ */ 55 /** 56 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 57 * @see {@link #setLowResourcesMaxIdleTime(long)} 58 */ setMaxIdleTime(long maxIdleTime)59 public void setMaxIdleTime(long maxIdleTime) 60 { 61 _maxIdleTime=maxIdleTime; 62 } 63 64 /* ------------------------------------------------------------ */ 65 /** 66 * @param selectSets 67 */ setSelectSets(int selectSets)68 public void setSelectSets(int selectSets) 69 { 70 long lrc = _lowResourcesConnections * _selectSets; 71 _selectSets=selectSets; 72 _lowResourcesConnections=lrc/_selectSets; 73 } 74 75 /* ------------------------------------------------------------ */ 76 /** 77 * @return 78 */ getMaxIdleTime()79 public long getMaxIdleTime() 80 { 81 return _maxIdleTime; 82 } 83 84 /* ------------------------------------------------------------ */ 85 /** 86 * @return 87 */ getSelectSets()88 public int getSelectSets() 89 { 90 return _selectSets; 91 } 92 93 /* ------------------------------------------------------------ */ 94 /** 95 * @return 96 */ isDelaySelectKeyUpdate()97 public boolean isDelaySelectKeyUpdate() 98 { 99 return _delaySelectKeyUpdate; 100 } 101 102 /* ------------------------------------------------------------ */ 103 /** Register a channel 104 * @param channel 105 * @param att Attached Object 106 * @throws IOException 107 */ register(SocketChannel channel, Object att)108 public void register(SocketChannel channel, Object att) throws IOException 109 { 110 int s=_set++; 111 s=s%_selectSets; 112 SelectSet[] sets=_selectSet; 113 if (sets!=null) 114 { 115 SelectSet set=sets[s]; 116 set.addChange(channel,att); 117 set.wakeup(); 118 } 119 } 120 121 /* ------------------------------------------------------------ */ 122 /** Register a serverchannel 123 * @param acceptChannel 124 * @return 125 * @throws IOException 126 */ register(ServerSocketChannel acceptChannel)127 public void register(ServerSocketChannel acceptChannel) throws IOException 128 { 129 int s=_set++; 130 s=s%_selectSets; 131 SelectSet set=_selectSet[s]; 132 set.addChange(acceptChannel); 133 set.wakeup(); 134 } 135 136 /* ------------------------------------------------------------ */ 137 /** 138 * @return the lowResourcesConnections 139 */ getLowResourcesConnections()140 public long getLowResourcesConnections() 141 { 142 return _lowResourcesConnections*_selectSets; 143 } 144 145 /* ------------------------------------------------------------ */ 146 /** 147 * Set the number of connections, which if exceeded places this manager in low resources state. 148 * This is not an exact measure as the connection count is averaged over the select sets. 149 * @param lowResourcesConnections the number of connections 150 * @see {@link #setLowResourcesMaxIdleTime(long)} 151 */ setLowResourcesConnections(long lowResourcesConnections)152 public void setLowResourcesConnections(long lowResourcesConnections) 153 { 154 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; 155 } 156 157 /* ------------------------------------------------------------ */ 158 /** 159 * @return the lowResourcesMaxIdleTime 160 */ getLowResourcesMaxIdleTime()161 public long getLowResourcesMaxIdleTime() 162 { 163 return _lowResourcesMaxIdleTime; 164 } 165 166 /* ------------------------------------------------------------ */ 167 /** 168 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} 169 * @see {@link #setMaxIdleTime(long)} 170 */ setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)171 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) 172 { 173 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; 174 } 175 176 /* ------------------------------------------------------------ */ 177 /** 178 * @param acceptorID 179 * @throws IOException 180 */ doSelect(int acceptorID)181 public void doSelect(int acceptorID) throws IOException 182 { 183 SelectSet[] sets= _selectSet; 184 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null) 185 sets[acceptorID].doSelect(); 186 } 187 188 189 /* ------------------------------------------------------------ */ 190 /** 191 * @param delaySelectKeyUpdate 192 */ setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)193 public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate) 194 { 195 _delaySelectKeyUpdate=delaySelectKeyUpdate; 196 } 197 198 /* ------------------------------------------------------------ */ 199 /** 200 * @param key 201 * @return 202 * @throws IOException 203 */ acceptChannel(SelectionKey key)204 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException; 205 206 /* ------------------------------------------------------------------------------- */ dispatch(Runnable task)207 public abstract boolean dispatch(Runnable task) throws IOException; 208 209 /* ------------------------------------------------------------ */ 210 /* (non-Javadoc) 211 * @see org.mortbay.component.AbstractLifeCycle#doStart() 212 */ doStart()213 protected void doStart() throws Exception 214 { 215 _selectSet = new SelectSet[_selectSets]; 216 for (int i=0;i<_selectSet.length;i++) 217 _selectSet[i]= new SelectSet(i); 218 219 super.doStart(); 220 } 221 222 223 /* ------------------------------------------------------------------------------- */ doStop()224 protected void doStop() throws Exception 225 { 226 SelectSet[] sets= _selectSet; 227 _selectSet=null; 228 if (sets!=null) 229 for (int i=0;i<sets.length;i++) 230 { 231 SelectSet set = sets[i]; 232 if (set!=null) 233 set.stop(); 234 } 235 super.doStop(); 236 } 237 238 /* ------------------------------------------------------------ */ 239 /** 240 * @param endpoint 241 */ endPointClosed(SelectChannelEndPoint endpoint)242 protected abstract void endPointClosed(SelectChannelEndPoint endpoint); 243 244 /* ------------------------------------------------------------ */ 245 /** 246 * @param endpoint 247 */ endPointOpened(SelectChannelEndPoint endpoint)248 protected abstract void endPointOpened(SelectChannelEndPoint endpoint); 249 250 /* ------------------------------------------------------------------------------- */ newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)251 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint); 252 253 /* ------------------------------------------------------------ */ 254 /** 255 * @param channel 256 * @param selectSet 257 * @param sKey 258 * @return 259 * @throws IOException 260 */ newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey)261 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; 262 263 /* ------------------------------------------------------------------------------- */ connectionFailed(SocketChannel channel,Throwable ex,Object attachment)264 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) 265 { 266 Log.warn(ex); 267 } 268 269 /* ------------------------------------------------------------------------------- */ 270 /* ------------------------------------------------------------------------------- */ 271 /* ------------------------------------------------------------------------------- */ 272 public class SelectSet 273 { 274 private transient int _change; 275 private transient List[] _changes; 276 private transient Timeout _idleTimeout; 277 private transient int _nextSet; 278 private transient Timeout _retryTimeout; 279 private transient Selector _selector; 280 private transient int _setID; 281 private transient boolean _selecting; 282 private transient int _jvmBug; 283 284 /* ------------------------------------------------------------ */ SelectSet(int acceptorID)285 SelectSet(int acceptorID) throws Exception 286 { 287 _setID=acceptorID; 288 289 _idleTimeout = new Timeout(this); 290 _idleTimeout.setDuration(getMaxIdleTime()); 291 _retryTimeout = new Timeout(this); 292 _retryTimeout.setDuration(0L); 293 294 // create a selector; 295 _selector = Selector.open(); 296 _changes = new ArrayList[] {new ArrayList(),new ArrayList()}; 297 _change=0; 298 } 299 300 /* ------------------------------------------------------------ */ addChange(Object point)301 public void addChange(Object point) 302 { 303 synchronized (_changes) 304 { 305 _changes[_change].add(point); 306 if (point instanceof SocketChannel) 307 _changes[_change].add(null); 308 } 309 } 310 311 /* ------------------------------------------------------------ */ addChange(SocketChannel channel, Object att)312 public void addChange(SocketChannel channel, Object att) 313 { 314 synchronized (_changes) 315 { 316 _changes[_change].add(channel); 317 _changes[_change].add(att); 318 } 319 } 320 321 /* ------------------------------------------------------------ */ cancelIdle(Timeout.Task task)322 public void cancelIdle(Timeout.Task task) 323 { 324 synchronized (this) 325 { 326 task.cancel(); 327 } 328 } 329 330 /* ------------------------------------------------------------ */ 331 /** 332 * Select and dispatch tasks found from changes and the selector. 333 * 334 * @throws IOException 335 */ doSelect()336 public void doSelect() throws IOException 337 { 338 SelectionKey key=null; 339 340 try 341 { 342 List changes; 343 synchronized (_changes) 344 { 345 changes=_changes[_change]; 346 _change=_change==0?1:0; 347 _selecting=true; 348 } 349 350 // Make any key changes required 351 for (int i = 0; i < changes.size(); i++) 352 { 353 try 354 { 355 Object o = changes.get(i); 356 if (o instanceof EndPoint) 357 { 358 // Update the operations for a key. 359 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o; 360 endpoint.doUpdateKey(); 361 } 362 else if (o instanceof Runnable) 363 { 364 dispatch((Runnable)o); 365 } 366 else if (o instanceof SocketChannel) 367 { 368 // finish accepting/connecting this connection 369 SocketChannel channel=(SocketChannel)o; 370 Object att = changes.get(++i); 371 372 if (channel.isConnected()) 373 { 374 key = channel.register(_selector,SelectionKey.OP_READ,att); 375 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); 376 key.attach(endpoint); 377 endpoint.dispatch(); 378 } 379 else 380 { 381 channel.register(_selector,SelectionKey.OP_CONNECT,att); 382 } 383 384 } 385 else if (o instanceof ServerSocketChannel) 386 { 387 ServerSocketChannel channel = (ServerSocketChannel)o; 388 channel.register(getSelector(),SelectionKey.OP_ACCEPT); 389 } 390 else 391 throw new IllegalArgumentException(o.toString()); 392 } 393 catch (CancelledKeyException e) 394 { 395 if (isRunning()) 396 Log.warn(e); 397 else 398 Log.debug(e); 399 } 400 } 401 changes.clear(); 402 403 long idle_next = 0; 404 long retry_next = 0; 405 long now=System.currentTimeMillis(); 406 synchronized (this) 407 { 408 _idleTimeout.setNow(now); 409 _retryTimeout.setNow(now); 410 if (_lowResourcesConnections>0 && _selector.keys().size()>_lowResourcesConnections) 411 _idleTimeout.setDuration(_lowResourcesMaxIdleTime); 412 else 413 _idleTimeout.setDuration(_maxIdleTime); 414 idle_next=_idleTimeout.getTimeToNext(); 415 retry_next=_retryTimeout.getTimeToNext(); 416 } 417 418 // workout how low to wait in select 419 long wait = 1000L; // not getMaxIdleTime() as the now value of the idle timers needs to be updated. 420 if (idle_next >= 0 && wait > idle_next) 421 wait = idle_next; 422 if (wait > 0 && retry_next >= 0 && wait > retry_next) 423 wait = retry_next; 424 425 // Do the select. 426 if (wait > 10) // TODO tune or configure this 427 { 428 long before=now; 429 int selected=_selector.select(wait); 430 now = System.currentTimeMillis(); 431 _idleTimeout.setNow(now); 432 _retryTimeout.setNow(now); 433 434 // Look for JVM bug 435 if (selected==0 && wait>0 && (now-before)<wait/2 && _selector.selectedKeys().size()==0) 436 { 437 if (_jvmBug++>5) // TODO tune or configure this 438 { 439 // Probably JVM BUG! 440 441 Iterator iter = _selector.keys().iterator(); 442 while(iter.hasNext()) 443 { 444 key = (SelectionKey) iter.next(); 445 if (key.isValid()&&key.interestOps()==0) 446 { 447 key.cancel(); 448 } 449 } 450 try 451 { 452 Thread.sleep(20); // tune or configure this 453 } 454 catch (InterruptedException e) 455 { 456 Log.ignore(e); 457 } 458 } 459 } 460 else 461 _jvmBug=0; 462 } 463 else 464 { 465 _selector.selectNow(); 466 _jvmBug=0; 467 } 468 469 // have we been destroyed while sleeping\ 470 if (_selector==null || !_selector.isOpen()) 471 return; 472 473 // Look for things to do 474 Iterator iter = _selector.selectedKeys().iterator(); 475 while (iter.hasNext()) 476 { 477 key = (SelectionKey) iter.next(); 478 479 try 480 { 481 if (!key.isValid()) 482 { 483 key.cancel(); 484 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); 485 if (endpoint != null) 486 endpoint.doUpdateKey(); 487 continue; 488 } 489 490 Object att = key.attachment(); 491 if (att instanceof SelectChannelEndPoint) 492 { 493 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att; 494 endpoint.dispatch(); 495 } 496 else if (key.isAcceptable()) 497 { 498 SocketChannel channel = acceptChannel(key); 499 if (channel==null) 500 continue; 501 502 channel.configureBlocking(false); 503 504 // TODO make it reluctant to leave 0 505 _nextSet=++_nextSet%_selectSet.length; 506 507 // Is this for this selectset 508 if (_nextSet==_setID) 509 { 510 // bind connections to this select set. 511 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ); 512 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey); 513 cKey.attach(endpoint); 514 if (endpoint != null) 515 endpoint.dispatch(); 516 } 517 else 518 { 519 // nope - give it to another. 520 _selectSet[_nextSet].addChange(channel); 521 _selectSet[_nextSet].wakeup(); 522 } 523 } 524 else if (key.isConnectable()) 525 { 526 // Complete a connection of a registered channel 527 SocketChannel channel = (SocketChannel)key.channel(); 528 boolean connected=false; 529 try 530 { 531 connected=channel.finishConnect(); 532 } 533 catch(Exception e) 534 { 535 connectionFailed(channel,e,att); 536 } 537 finally 538 { 539 if (connected) 540 { 541 key.interestOps(SelectionKey.OP_READ); 542 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); 543 key.attach(endpoint); 544 endpoint.dispatch(); 545 } 546 else 547 { 548 key.cancel(); 549 } 550 } 551 } 552 else 553 { 554 // Wrap readable registered channel in an endpoint 555 SocketChannel channel = (SocketChannel)key.channel(); 556 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); 557 key.attach(endpoint); 558 if (key.isReadable()) 559 endpoint.dispatch(); 560 } 561 key = null; 562 } 563 catch (CancelledKeyException e) 564 { 565 Log.ignore(e); 566 } 567 catch (Exception e) 568 { 569 if (isRunning()) 570 Log.warn(e); 571 else 572 Log.ignore(e); 573 574 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) 575 { 576 key.interestOps(0); 577 578 key.cancel(); 579 } 580 } 581 } 582 583 // Everything always handled 584 _selector.selectedKeys().clear(); 585 586 // tick over the timers 587 _idleTimeout.tick(now); 588 _retryTimeout.tick(now); 589 590 } 591 catch (CancelledKeyException e) 592 { 593 Log.ignore(e); 594 } 595 finally 596 { 597 synchronized(this) 598 { 599 _selecting=false; 600 } 601 } 602 } 603 604 /* ------------------------------------------------------------ */ getManager()605 public SelectorManager getManager() 606 { 607 return SelectorManager.this; 608 } 609 610 /* ------------------------------------------------------------ */ getNow()611 public long getNow() 612 { 613 return _idleTimeout.getNow(); 614 } 615 616 /* ------------------------------------------------------------ */ scheduleIdle(Timeout.Task task)617 public void scheduleIdle(Timeout.Task task) 618 { 619 synchronized (this) 620 { 621 if (_idleTimeout.getDuration() <= 0) 622 return; 623 624 task.schedule(_idleTimeout); 625 } 626 } 627 628 /* ------------------------------------------------------------ */ scheduleTimeout(Timeout.Task task, long timeout)629 public void scheduleTimeout(Timeout.Task task, long timeout) 630 { 631 synchronized (this) 632 { 633 _retryTimeout.schedule(task, timeout); 634 } 635 } 636 637 /* ------------------------------------------------------------ */ wakeup()638 public void wakeup() 639 { 640 Selector selector = _selector; 641 if (selector!=null) 642 selector.wakeup(); 643 } 644 645 /* ------------------------------------------------------------ */ getSelector()646 Selector getSelector() 647 { 648 return _selector; 649 } 650 651 /* ------------------------------------------------------------ */ stop()652 void stop() throws Exception 653 { 654 boolean selecting=true; 655 while(selecting) 656 { 657 wakeup(); 658 synchronized (this) 659 { 660 selecting=_selecting; 661 } 662 } 663 664 ArrayList keys=new ArrayList(_selector.keys()); 665 Iterator iter =keys.iterator(); 666 667 while (iter.hasNext()) 668 { 669 SelectionKey key = (SelectionKey)iter.next(); 670 if (key==null) 671 continue; 672 EndPoint endpoint = (EndPoint)key.attachment(); 673 if (endpoint!=null) 674 { 675 try 676 { 677 endpoint.close(); 678 } 679 catch(IOException e) 680 { 681 Log.ignore(e); 682 } 683 } 684 } 685 686 synchronized (this) 687 { 688 _idleTimeout.cancelAll(); 689 _retryTimeout.cancelAll(); 690 try 691 { 692 if (_selector != null) 693 _selector.close(); 694 } 695 catch (IOException e) 696 { 697 Log.ignore(e); 698 } 699 _selector=null; 700 } 701 } 702 } 703 704 }