1 //========================================================================
2 //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3 //------------------------------------------------------------------------
4 //Licensed under the Apache License, Version 2.0 (the "License");
5 //you may not use this file except in compliance with the License.
6 //You may obtain a copy of the License at
7 //http://www.apache.org/licenses/LICENSE-2.0
8 //Unless required by applicable law or agreed to in writing, software
9 //distributed under the License is distributed on an "AS IS" BASIS,
10 //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 //See the License for the specific language governing permissions and
12 //limitations under the License.
13 //========================================================================
14 
15 package org.mortbay.io.nio;
16 
17 import java.io.IOException;
18 import java.nio.channels.CancelledKeyException;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.List;
26 
27 import org.mortbay.component.AbstractLifeCycle;
28 import org.mortbay.component.LifeCycle;
29 import org.mortbay.io.Connection;
30 import org.mortbay.io.EndPoint;
31 import org.mortbay.log.Log;
32 import org.mortbay.thread.Timeout;
33 
34 
35 /* ------------------------------------------------------------ */
36 /**
37  * The Selector Manager manages and number of SelectSets to allow
38  * NIO scheduling to scale to large numbers of connections.
39  *
40  * @author gregw
41  *
42  */
43 public abstract class SelectorManager extends AbstractLifeCycle
44 {
45     private boolean _delaySelectKeyUpdate=true;
46     private long _maxIdleTime;
47     private long _lowResourcesConnections;
48     private long _lowResourcesMaxIdleTime;
49     private transient SelectSet[] _selectSet;
50     private int _selectSets=1;
51     private volatile int _set;
52 
53 
54     /* ------------------------------------------------------------ */
55     /**
56      * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
57      * @see {@link #setLowResourcesMaxIdleTime(long)}
58      */
setMaxIdleTime(long maxIdleTime)59     public void setMaxIdleTime(long maxIdleTime)
60     {
61         _maxIdleTime=maxIdleTime;
62     }
63 
64     /* ------------------------------------------------------------ */
65     /**
66      * @param selectSets
67      */
setSelectSets(int selectSets)68     public void setSelectSets(int selectSets)
69     {
70         long lrc = _lowResourcesConnections * _selectSets;
71         _selectSets=selectSets;
72         _lowResourcesConnections=lrc/_selectSets;
73     }
74 
75     /* ------------------------------------------------------------ */
76     /**
77      * @return
78      */
getMaxIdleTime()79     public long getMaxIdleTime()
80     {
81         return _maxIdleTime;
82     }
83 
84     /* ------------------------------------------------------------ */
85     /**
86      * @return
87      */
getSelectSets()88     public int getSelectSets()
89     {
90         return _selectSets;
91     }
92 
93     /* ------------------------------------------------------------ */
94     /**
95      * @return
96      */
isDelaySelectKeyUpdate()97     public boolean isDelaySelectKeyUpdate()
98     {
99         return _delaySelectKeyUpdate;
100     }
101 
102     /* ------------------------------------------------------------ */
103     /** Register a channel
104      * @param channel
105      * @param att Attached Object
106      * @throws IOException
107      */
register(SocketChannel channel, Object att)108     public void register(SocketChannel channel, Object att) throws IOException
109     {
110         int s=_set++;
111         s=s%_selectSets;
112         SelectSet[] sets=_selectSet;
113         if (sets!=null)
114         {
115             SelectSet set=sets[s];
116             set.addChange(channel,att);
117             set.wakeup();
118         }
119     }
120 
121     /* ------------------------------------------------------------ */
122     /** Register a serverchannel
123      * @param acceptChannel
124      * @return
125      * @throws IOException
126      */
register(ServerSocketChannel acceptChannel)127     public void register(ServerSocketChannel acceptChannel) throws IOException
128     {
129         int s=_set++;
130         s=s%_selectSets;
131         SelectSet set=_selectSet[s];
132         set.addChange(acceptChannel);
133         set.wakeup();
134     }
135 
136     /* ------------------------------------------------------------ */
137     /**
138      * @return the lowResourcesConnections
139      */
getLowResourcesConnections()140     public long getLowResourcesConnections()
141     {
142         return _lowResourcesConnections*_selectSets;
143     }
144 
145     /* ------------------------------------------------------------ */
146     /**
147      * Set the number of connections, which if exceeded places this manager in low resources state.
148      * This is not an exact measure as the connection count is averaged over the select sets.
149      * @param lowResourcesConnections the number of connections
150      * @see {@link #setLowResourcesMaxIdleTime(long)}
151      */
setLowResourcesConnections(long lowResourcesConnections)152     public void setLowResourcesConnections(long lowResourcesConnections)
153     {
154         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
155     }
156 
157     /* ------------------------------------------------------------ */
158     /**
159      * @return the lowResourcesMaxIdleTime
160      */
getLowResourcesMaxIdleTime()161     public long getLowResourcesMaxIdleTime()
162     {
163         return _lowResourcesMaxIdleTime;
164     }
165 
166     /* ------------------------------------------------------------ */
167     /**
168      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
169      * @see {@link #setMaxIdleTime(long)}
170      */
setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)171     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
172     {
173         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @param acceptorID
179      * @throws IOException
180      */
doSelect(int acceptorID)181     public void doSelect(int acceptorID) throws IOException
182     {
183         SelectSet[] sets= _selectSet;
184         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
185             sets[acceptorID].doSelect();
186     }
187 
188 
189     /* ------------------------------------------------------------ */
190     /**
191      * @param delaySelectKeyUpdate
192      */
setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)193     public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
194     {
195         _delaySelectKeyUpdate=delaySelectKeyUpdate;
196     }
197 
198     /* ------------------------------------------------------------ */
199     /**
200      * @param key
201      * @return
202      * @throws IOException
203      */
acceptChannel(SelectionKey key)204     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
205 
206     /* ------------------------------------------------------------------------------- */
dispatch(Runnable task)207     public abstract boolean dispatch(Runnable task) throws IOException;
208 
209     /* ------------------------------------------------------------ */
210     /* (non-Javadoc)
211      * @see org.mortbay.component.AbstractLifeCycle#doStart()
212      */
doStart()213     protected void doStart() throws Exception
214     {
215         _selectSet = new SelectSet[_selectSets];
216         for (int i=0;i<_selectSet.length;i++)
217             _selectSet[i]= new SelectSet(i);
218 
219         super.doStart();
220     }
221 
222 
223     /* ------------------------------------------------------------------------------- */
doStop()224     protected void doStop() throws Exception
225     {
226         SelectSet[] sets= _selectSet;
227         _selectSet=null;
228         if (sets!=null)
229             for (int i=0;i<sets.length;i++)
230             {
231                 SelectSet set = sets[i];
232                 if (set!=null)
233                     set.stop();
234             }
235         super.doStop();
236     }
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * @param endpoint
241      */
endPointClosed(SelectChannelEndPoint endpoint)242     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
243 
244     /* ------------------------------------------------------------ */
245     /**
246      * @param endpoint
247      */
endPointOpened(SelectChannelEndPoint endpoint)248     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
249 
250     /* ------------------------------------------------------------------------------- */
newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)251     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
252 
253     /* ------------------------------------------------------------ */
254     /**
255      * @param channel
256      * @param selectSet
257      * @param sKey
258      * @return
259      * @throws IOException
260      */
newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey)261     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
262 
263     /* ------------------------------------------------------------------------------- */
connectionFailed(SocketChannel channel,Throwable ex,Object attachment)264     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
265     {
266         Log.warn(ex);
267     }
268 
269     /* ------------------------------------------------------------------------------- */
270     /* ------------------------------------------------------------------------------- */
271     /* ------------------------------------------------------------------------------- */
272     public class SelectSet
273     {
274         private transient int _change;
275         private transient List[] _changes;
276         private transient Timeout _idleTimeout;
277         private transient int _nextSet;
278         private transient Timeout _retryTimeout;
279         private transient Selector _selector;
280         private transient int _setID;
281         private transient boolean _selecting;
282         private transient int _jvmBug;
283 
284         /* ------------------------------------------------------------ */
SelectSet(int acceptorID)285         SelectSet(int acceptorID) throws Exception
286         {
287             _setID=acceptorID;
288 
289             _idleTimeout = new Timeout(this);
290             _idleTimeout.setDuration(getMaxIdleTime());
291             _retryTimeout = new Timeout(this);
292             _retryTimeout.setDuration(0L);
293 
294             // create a selector;
295             _selector = Selector.open();
296             _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
297             _change=0;
298         }
299 
300         /* ------------------------------------------------------------ */
addChange(Object point)301         public void addChange(Object point)
302         {
303             synchronized (_changes)
304             {
305                 _changes[_change].add(point);
306                 if (point instanceof SocketChannel)
307                     _changes[_change].add(null);
308             }
309         }
310 
311         /* ------------------------------------------------------------ */
addChange(SocketChannel channel, Object att)312         public void addChange(SocketChannel channel, Object att)
313         {
314             synchronized (_changes)
315             {
316                 _changes[_change].add(channel);
317                 _changes[_change].add(att);
318             }
319         }
320 
321         /* ------------------------------------------------------------ */
cancelIdle(Timeout.Task task)322         public void cancelIdle(Timeout.Task task)
323         {
324             synchronized (this)
325             {
326                 task.cancel();
327             }
328         }
329 
330         /* ------------------------------------------------------------ */
331         /**
332          * Select and dispatch tasks found from changes and the selector.
333          *
334          * @throws IOException
335          */
doSelect()336         public void doSelect() throws IOException
337         {
338             SelectionKey key=null;
339 
340             try
341             {
342                 List changes;
343                 synchronized (_changes)
344                 {
345                     changes=_changes[_change];
346                     _change=_change==0?1:0;
347                     _selecting=true;
348                 }
349 
350                 // Make any key changes required
351                 for (int i = 0; i < changes.size(); i++)
352                 {
353                     try
354                     {
355                         Object o = changes.get(i);
356                         if (o instanceof EndPoint)
357                         {
358                             // Update the operations for a key.
359                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
360                             endpoint.doUpdateKey();
361                         }
362                         else if (o instanceof Runnable)
363                         {
364                             dispatch((Runnable)o);
365                         }
366                         else if (o instanceof SocketChannel)
367                         {
368                             // finish accepting/connecting this connection
369                             SocketChannel channel=(SocketChannel)o;
370                             Object att = changes.get(++i);
371 
372                             if (channel.isConnected())
373                             {
374                                 key = channel.register(_selector,SelectionKey.OP_READ,att);
375                                 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
376                                 key.attach(endpoint);
377                                 endpoint.dispatch();
378                             }
379                             else
380                             {
381                                 channel.register(_selector,SelectionKey.OP_CONNECT,att);
382                             }
383 
384                         }
385                         else if (o instanceof ServerSocketChannel)
386                         {
387                             ServerSocketChannel channel = (ServerSocketChannel)o;
388                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
389                         }
390                         else
391                             throw new IllegalArgumentException(o.toString());
392                     }
393                     catch (CancelledKeyException e)
394                     {
395                         if (isRunning())
396                             Log.warn(e);
397                         else
398                             Log.debug(e);
399                     }
400                 }
401                 changes.clear();
402 
403                 long idle_next = 0;
404                 long retry_next = 0;
405                 long now=System.currentTimeMillis();
406                 synchronized (this)
407                 {
408                     _idleTimeout.setNow(now);
409                     _retryTimeout.setNow(now);
410                     if (_lowResourcesConnections>0 && _selector.keys().size()>_lowResourcesConnections)
411                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
412                     else
413                         _idleTimeout.setDuration(_maxIdleTime);
414                     idle_next=_idleTimeout.getTimeToNext();
415                     retry_next=_retryTimeout.getTimeToNext();
416                 }
417 
418                 // workout how low to wait in select
419                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
420                 if (idle_next >= 0 && wait > idle_next)
421                     wait = idle_next;
422                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
423                     wait = retry_next;
424 
425                 // Do the select.
426                 if (wait > 10) // TODO tune or configure this
427                 {
428                     long before=now;
429                     int selected=_selector.select(wait);
430                     now = System.currentTimeMillis();
431                     _idleTimeout.setNow(now);
432                     _retryTimeout.setNow(now);
433 
434                     // Look for JVM bug
435                     if (selected==0 && wait>0 && (now-before)<wait/2 && _selector.selectedKeys().size()==0)
436                     {
437                         if (_jvmBug++>5)  // TODO tune or configure this
438                         {
439                             // Probably JVM BUG!
440 
441                             Iterator iter = _selector.keys().iterator();
442                             while(iter.hasNext())
443                             {
444                                 key = (SelectionKey) iter.next();
445                                 if (key.isValid()&&key.interestOps()==0)
446                                 {
447                                     key.cancel();
448                                 }
449                             }
450                             try
451                             {
452                                 Thread.sleep(20);  // tune or configure this
453                             }
454                             catch (InterruptedException e)
455                             {
456                                 Log.ignore(e);
457                             }
458                         }
459                     }
460                     else
461                         _jvmBug=0;
462                 }
463                 else
464                 {
465                     _selector.selectNow();
466                     _jvmBug=0;
467                 }
468 
469                 // have we been destroyed while sleeping\
470                 if (_selector==null || !_selector.isOpen())
471                     return;
472 
473                 // Look for things to do
474                 Iterator iter = _selector.selectedKeys().iterator();
475                 while (iter.hasNext())
476                 {
477                     key = (SelectionKey) iter.next();
478 
479                     try
480                     {
481                         if (!key.isValid())
482                         {
483                             key.cancel();
484                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
485                             if (endpoint != null)
486                                 endpoint.doUpdateKey();
487                             continue;
488                         }
489 
490                         Object att = key.attachment();
491                         if (att instanceof SelectChannelEndPoint)
492                         {
493                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
494                             endpoint.dispatch();
495                         }
496                         else if (key.isAcceptable())
497                         {
498                             SocketChannel channel = acceptChannel(key);
499                             if (channel==null)
500                                 continue;
501 
502                             channel.configureBlocking(false);
503 
504                             // TODO make it reluctant to leave 0
505                             _nextSet=++_nextSet%_selectSet.length;
506 
507                             // Is this for this selectset
508                             if (_nextSet==_setID)
509                             {
510                                 // bind connections to this select set.
511                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
512                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
513                                 cKey.attach(endpoint);
514                                 if (endpoint != null)
515                                     endpoint.dispatch();
516                             }
517                             else
518                             {
519                                 // nope - give it to another.
520                                 _selectSet[_nextSet].addChange(channel);
521                                 _selectSet[_nextSet].wakeup();
522                             }
523                         }
524                         else if (key.isConnectable())
525                         {
526                             // Complete a connection of a registered channel
527                             SocketChannel channel = (SocketChannel)key.channel();
528                             boolean connected=false;
529                             try
530                             {
531                                 connected=channel.finishConnect();
532                             }
533                             catch(Exception e)
534                             {
535                                 connectionFailed(channel,e,att);
536                             }
537                             finally
538                             {
539                                 if (connected)
540                                 {
541                                     key.interestOps(SelectionKey.OP_READ);
542                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
543                                     key.attach(endpoint);
544                                     endpoint.dispatch();
545                                 }
546                                 else
547                                 {
548                                     key.cancel();
549                                 }
550                             }
551                         }
552                         else
553                         {
554                             // Wrap readable registered channel in an endpoint
555                             SocketChannel channel = (SocketChannel)key.channel();
556                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
557                             key.attach(endpoint);
558                             if (key.isReadable())
559                                 endpoint.dispatch();
560                         }
561                         key = null;
562                     }
563                     catch (CancelledKeyException e)
564                     {
565                         Log.ignore(e);
566                     }
567                     catch (Exception e)
568                     {
569                         if (isRunning())
570                             Log.warn(e);
571                         else
572                             Log.ignore(e);
573 
574                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
575                         {
576                             key.interestOps(0);
577 
578                             key.cancel();
579                         }
580                     }
581                 }
582 
583                 // Everything always handled
584                 _selector.selectedKeys().clear();
585 
586                 // tick over the timers
587                 _idleTimeout.tick(now);
588                 _retryTimeout.tick(now);
589 
590             }
591             catch (CancelledKeyException e)
592             {
593                 Log.ignore(e);
594             }
595             finally
596             {
597                 synchronized(this)
598                 {
599                     _selecting=false;
600                 }
601             }
602         }
603 
604         /* ------------------------------------------------------------ */
getManager()605         public SelectorManager getManager()
606         {
607             return SelectorManager.this;
608         }
609 
610         /* ------------------------------------------------------------ */
getNow()611         public long getNow()
612         {
613             return _idleTimeout.getNow();
614         }
615 
616         /* ------------------------------------------------------------ */
scheduleIdle(Timeout.Task task)617         public void scheduleIdle(Timeout.Task task)
618         {
619             synchronized (this)
620             {
621                 if (_idleTimeout.getDuration() <= 0)
622                     return;
623 
624                 task.schedule(_idleTimeout);
625             }
626         }
627 
628         /* ------------------------------------------------------------ */
scheduleTimeout(Timeout.Task task, long timeout)629         public void scheduleTimeout(Timeout.Task task, long timeout)
630         {
631             synchronized (this)
632             {
633                 _retryTimeout.schedule(task, timeout);
634             }
635         }
636 
637         /* ------------------------------------------------------------ */
wakeup()638         public void wakeup()
639         {
640             Selector selector = _selector;
641             if (selector!=null)
642                 selector.wakeup();
643         }
644 
645         /* ------------------------------------------------------------ */
getSelector()646         Selector getSelector()
647         {
648             return _selector;
649         }
650 
651         /* ------------------------------------------------------------ */
stop()652         void stop() throws Exception
653         {
654             boolean selecting=true;
655             while(selecting)
656             {
657                 wakeup();
658                 synchronized (this)
659                 {
660                     selecting=_selecting;
661                 }
662             }
663 
664             ArrayList keys=new ArrayList(_selector.keys());
665             Iterator iter =keys.iterator();
666 
667             while (iter.hasNext())
668             {
669                 SelectionKey key = (SelectionKey)iter.next();
670                 if (key==null)
671                     continue;
672                 EndPoint endpoint = (EndPoint)key.attachment();
673                 if (endpoint!=null)
674                 {
675                     try
676                     {
677                         endpoint.close();
678                     }
679                     catch(IOException e)
680                     {
681                         Log.ignore(e);
682                     }
683                 }
684             }
685 
686             synchronized (this)
687             {
688                 _idleTimeout.cancelAll();
689                 _retryTimeout.cancelAll();
690                 try
691                 {
692                     if (_selector != null)
693                         _selector.close();
694                 }
695                 catch (IOException e)
696                 {
697                     Log.ignore(e);
698                 }
699                 _selector=null;
700             }
701         }
702     }
703 
704 }