1 /*
2  * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.io.IOException;
29 import java.nio.channels.ClosedSelectorException;
30 import java.nio.channels.Pipe;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.spi.SelectorProvider;
34 import java.util.ArrayDeque;
35 import java.util.ArrayList;
36 import java.util.Deque;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.function.Consumer;
41 import jdk.internal.misc.Unsafe;
42 
43 /**
44  * A multi-threaded implementation of Selector for Windows.
45  *
46  * @author Konstantin Kladko
47  * @author Mark Reinhold
48  */
49 
50 class WindowsSelectorImpl extends SelectorImpl {
51     private static final Unsafe unsafe = Unsafe.getUnsafe();
52     private static int addressSize = unsafe.addressSize();
53 
dependsArch(int value32, int value64)54     private static int dependsArch(int value32, int value64) {
55         return (addressSize == 4) ? value32 : value64;
56     }
57 
58     // Initial capacity of the poll array
59     private final int INIT_CAP = 8;
60     // Maximum number of sockets for select().
61     // Should be INIT_CAP times a power of 2
62     private static final int MAX_SELECTABLE_FDS = 1024;
63 
64     // Size of FD_SET struct to allocate a buffer for it in SubSelector,
65     // aligned to 8 bytes on 64-bit:
66     // struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }.
67     private static final long SIZEOF_FD_SET = dependsArch(
68             4 + MAX_SELECTABLE_FDS * 4,      // SOCKET = unsigned int
69             4 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int64
70 
71     // The list of SelectableChannels serviced by this Selector. Every mod
72     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
73     // array,  where the corresponding entry is occupied by the wakeupSocket
74     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
75 
76     // The global native poll array holds file decriptors and event masks
77     private PollArrayWrapper pollWrapper;
78 
79     // The number of valid entries in  poll array, including entries occupied
80     // by wakeup socket handle.
81     private int totalChannels = 1;
82 
83     // Number of helper threads needed for select. We need one thread per
84     // each additional set of MAX_SELECTABLE_FDS - 1 channels.
85     private int threadsCount = 0;
86 
87     // A list of helper threads for select.
88     private final List<SelectThread> threads = new ArrayList<SelectThread>();
89 
90     //Pipe used as a wakeup object.
91     private final Pipe wakeupPipe;
92 
93     // File descriptors corresponding to source and sink
94     private final int wakeupSourceFd, wakeupSinkFd;
95 
96     // Maps file descriptors to their indices in  pollArray
97     private static final class FdMap extends HashMap<Integer, MapEntry> {
98         static final long serialVersionUID = 0L;
get(int desc)99         private MapEntry get(int desc) {
100             return get(Integer.valueOf(desc));
101         }
put(SelectionKeyImpl ski)102         private MapEntry put(SelectionKeyImpl ski) {
103             return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
104         }
remove(SelectionKeyImpl ski)105         private MapEntry remove(SelectionKeyImpl ski) {
106             Integer fd = Integer.valueOf(ski.getFDVal());
107             MapEntry x = get(fd);
108             if ((x != null) && (x.ski.channel() == ski.channel()))
109                 return remove(fd);
110             return null;
111         }
112     }
113 
114     // class for fdMap entries
115     private static final class MapEntry {
116         final SelectionKeyImpl ski;
117         long updateCount = 0;
MapEntry(SelectionKeyImpl ski)118         MapEntry(SelectionKeyImpl ski) {
119             this.ski = ski;
120         }
121     }
122     private final FdMap fdMap = new FdMap();
123 
124     // SubSelector for the main thread
125     private final SubSelector subSelector = new SubSelector();
126 
127     private long timeout; //timeout for poll
128 
129     // Lock for interrupt triggering and clearing
130     private final Object interruptLock = new Object();
131     private volatile boolean interruptTriggered;
132 
133     // pending new registrations/updates, queued by implRegister and setEventOps
134     private final Object updateLock = new Object();
135     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
136     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
137 
138 
WindowsSelectorImpl(SelectorProvider sp)139     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
140         super(sp);
141         pollWrapper = new PollArrayWrapper(INIT_CAP);
142         wakeupPipe = Pipe.open();
143         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
144 
145         // Disable the Nagle algorithm so that the wakeup is more immediate
146         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
147         (sink.sc).socket().setTcpNoDelay(true);
148         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
149 
150         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
151     }
152 
ensureOpen()153     private void ensureOpen() {
154         if (!isOpen())
155             throw new ClosedSelectorException();
156     }
157 
158     @Override
doSelect(Consumer<SelectionKey> action, long timeout)159     protected int doSelect(Consumer<SelectionKey> action, long timeout)
160         throws IOException
161     {
162         assert Thread.holdsLock(this);
163         this.timeout = timeout; // set selector timeout
164         processUpdateQueue();
165         processDeregisterQueue();
166         if (interruptTriggered) {
167             resetWakeupSocket();
168             return 0;
169         }
170         // Calculate number of helper threads needed for poll. If necessary
171         // threads are created here and start waiting on startLock
172         adjustThreadsCount();
173         finishLock.reset(); // reset finishLock
174         // Wakeup helper threads, waiting on startLock, so they start polling.
175         // Redundant threads will exit here after wakeup.
176         startLock.startThreads();
177         // do polling in the main thread. Main thread is responsible for
178         // first MAX_SELECTABLE_FDS entries in pollArray.
179         try {
180             begin();
181             try {
182                 subSelector.poll();
183             } catch (IOException e) {
184                 finishLock.setException(e); // Save this exception
185             }
186             // Main thread is out of poll(). Wakeup others and wait for them
187             if (threads.size() > 0)
188                 finishLock.waitForHelperThreads();
189           } finally {
190               end();
191           }
192         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
193         finishLock.checkForException();
194         processDeregisterQueue();
195         int updated = updateSelectedKeys(action);
196         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
197         resetWakeupSocket();
198         return updated;
199     }
200 
201     /**
202      * Process new registrations and changes to the interest ops.
203      */
processUpdateQueue()204     private void processUpdateQueue() {
205         assert Thread.holdsLock(this);
206 
207         synchronized (updateLock) {
208             SelectionKeyImpl ski;
209 
210             // new registrations
211             while ((ski = newKeys.pollFirst()) != null) {
212                 if (ski.isValid()) {
213                     growIfNeeded();
214                     channelArray[totalChannels] = ski;
215                     ski.setIndex(totalChannels);
216                     pollWrapper.putEntry(totalChannels, ski);
217                     totalChannels++;
218                     MapEntry previous = fdMap.put(ski);
219                     assert previous == null;
220                 }
221             }
222 
223             // changes to interest ops
224             while ((ski = updateKeys.pollFirst()) != null) {
225                 int events = ski.translateInterestOps();
226                 int fd = ski.getFDVal();
227                 if (ski.isValid() && fdMap.containsKey(fd)) {
228                     int index = ski.getIndex();
229                     assert index >= 0 && index < totalChannels;
230                     pollWrapper.putEventOps(index, events);
231                 }
232             }
233         }
234     }
235 
236     // Helper threads wait on this lock for the next poll.
237     private final StartLock startLock = new StartLock();
238 
239     private final class StartLock {
240         // A variable which distinguishes the current run of doSelect from the
241         // previous one. Incrementing runsCounter and notifying threads will
242         // trigger another round of poll.
243         private long runsCounter;
244        // Triggers threads, waiting on this lock to start polling.
245         private synchronized void startThreads() {
246             runsCounter++; // next run
247             notifyAll(); // wake up threads.
248         }
249         // This function is called by a helper thread to wait for the
250         // next round of poll(). It also checks, if this thread became
251         // redundant. If yes, it returns true, notifying the thread
252         // that it should exit.
253         private synchronized boolean waitForStart(SelectThread thread) {
254             while (true) {
255                 while (runsCounter == thread.lastRun) {
256                     try {
257                         startLock.wait();
258                     } catch (InterruptedException e) {
259                         Thread.currentThread().interrupt();
260                     }
261                 }
262                 if (thread.isZombie()) { // redundant thread
263                     return true; // will cause run() to exit.
264                 } else {
265                     thread.lastRun = runsCounter; // update lastRun
266                     return false; //   will cause run() to poll.
267                 }
268             }
269         }
270     }
271 
272     // Main thread waits on this lock, until all helper threads are done
273     // with poll().
274     private final FinishLock finishLock = new FinishLock();
275 
276     private final class FinishLock  {
277         // Number of helper threads, that did not finish yet.
278         private int threadsToFinish;
279 
280         // IOException which occurred during the last run.
281         IOException exception = null;
282 
283         // Called before polling.
284         private void reset() {
285             threadsToFinish = threads.size(); // helper threads
286         }
287 
288         // Each helper thread invokes this function on finishLock, when
289         // the thread is done with poll().
290         private synchronized void threadFinished() {
291             if (threadsToFinish == threads.size()) { // finished poll() first
292                 // if finished first, wakeup others
293                 wakeup();
294             }
295             threadsToFinish--;
296             if (threadsToFinish == 0) // all helper threads finished poll().
297                 notify();             // notify the main thread
298         }
299 
300         // The main thread invokes this function on finishLock to wait
301         // for helper threads to finish poll().
302         private synchronized void waitForHelperThreads() {
303             if (threadsToFinish == threads.size()) {
304                 // no helper threads finished yet. Wakeup them up.
305                 wakeup();
306             }
307             while (threadsToFinish != 0) {
308                 try {
309                     finishLock.wait();
310                 } catch (InterruptedException e) {
311                     // Interrupted - set interrupted state.
312                     Thread.currentThread().interrupt();
313                 }
314             }
315         }
316 
317         // sets IOException for this run
318         private synchronized void setException(IOException e) {
319             exception = e;
320         }
321 
322         // Checks if there was any exception during the last run.
323         // If yes, throws it
324         private void checkForException() throws IOException {
325             if (exception == null)
326                 return;
327             StringBuffer message =  new StringBuffer("An exception occurred" +
328                                        " during the execution of select(): \n");
329             message.append(exception);
330             message.append('\n');
331             exception = null;
332             throw new IOException(message.toString());
333         }
334     }
335 
336     private final class SubSelector {
337         private final int pollArrayIndex; // starting index in pollArray to poll
338         // These arrays will hold result of native select().
339         // The first element of each array is the number of selected sockets.
340         // Other elements are file descriptors of selected sockets.
341         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
342         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
343         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
344         // Buffer for readfds, writefds and exceptfds structs that are passed
345         // to native select().
346         private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 6);
347 
348         private SubSelector() {
349             this.pollArrayIndex = 0; // main thread
350         }
351 
352         private SubSelector(int threadIndex) { // helper threads
353             this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
354         }
355 
356         private int poll() throws IOException{ // poll for the main thread
357             return poll0(pollWrapper.pollArrayAddress,
358                          Math.min(totalChannels, MAX_SELECTABLE_FDS),
359                          readFds, writeFds, exceptFds, timeout, fdsBuffer);
360         }
361 
362         private int poll(int index) throws IOException {
363             // poll for helper threads
364             return  poll0(pollWrapper.pollArrayAddress +
365                      (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
366                      Math.min(MAX_SELECTABLE_FDS,
367                              totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
368                      readFds, writeFds, exceptFds, timeout, fdsBuffer);
369         }
370 
371         private native int poll0(long pollAddress, int numfds,
372              int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer);
373 
374         private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
375             int numKeysUpdated = 0;
376             numKeysUpdated += processFDSet(updateCount, action, readFds,
377                                            Net.POLLIN,
378                                            false);
379             numKeysUpdated += processFDSet(updateCount, action, writeFds,
380                                            Net.POLLCONN |
381                                            Net.POLLOUT,
382                                            false);
383             numKeysUpdated += processFDSet(updateCount, action, exceptFds,
384                                            Net.POLLIN |
385                                            Net.POLLCONN |
386                                            Net.POLLOUT,
387                                            true);
388             return numKeysUpdated;
389         }
390 
391         /**
392          * updateCount is used to tell if a key has been counted as updated
393          * in this select operation.
394          *
395          * me.updateCount <= updateCount
396          */
397         private int processFDSet(long updateCount,
398                                  Consumer<SelectionKey> action,
399                                  int[] fds, int rOps,
400                                  boolean isExceptFds)
401         {
402             int numKeysUpdated = 0;
403             for (int i = 1; i <= fds[0]; i++) {
404                 int desc = fds[i];
405                 if (desc == wakeupSourceFd) {
406                     synchronized (interruptLock) {
407                         interruptTriggered = true;
408                     }
409                     continue;
410                 }
411                 MapEntry me = fdMap.get(desc);
412                 // If me is null, the key was deregistered in the previous
413                 // processDeregisterQueue.
414                 if (me == null)
415                     continue;
416                 SelectionKeyImpl sk = me.ski;
417 
418                 // The descriptor may be in the exceptfds set because there is
419                 // OOB data queued to the socket. If there is OOB data then it
420                 // is discarded and the key is not added to the selected set.
421                 if (isExceptFds &&
422                     (sk.channel() instanceof SocketChannelImpl) &&
423                     discardUrgentData(desc))
424                 {
425                     continue;
426                 }
427 
428                 int updated = processReadyEvents(rOps, sk, action);
429                 if (updated > 0 && me.updateCount != updateCount) {
430                     me.updateCount = updateCount;
431                     numKeysUpdated++;
432                 }
433             }
434             return numKeysUpdated;
435         }
436 
437         private void freeFDSetBuffer() {
438             unsafe.freeMemory(fdsBuffer);
439         }
440     }
441 
442     // Represents a helper thread used for select.
443     private final class SelectThread extends Thread {
444         private final int index; // index of this thread
445         final SubSelector subSelector;
446         private long lastRun = 0; // last run number
447         private volatile boolean zombie;
448         // Creates a new thread
449         private SelectThread(int i) {
450             super(null, null, "SelectorHelper", 0, false);
451             this.index = i;
452             this.subSelector = new SubSelector(i);
453             //make sure we wait for next round of poll
454             this.lastRun = startLock.runsCounter;
455         }
456         void makeZombie() {
457             zombie = true;
458         }
459         boolean isZombie() {
460             return zombie;
461         }
462         public void run() {
463             while (true) { // poll loop
464                 // wait for the start of poll. If this thread has become
465                 // redundant, then exit.
466                 if (startLock.waitForStart(this)) {
467                     subSelector.freeFDSetBuffer();
468                     return;
469                 }
470                 // call poll()
471                 try {
472                     subSelector.poll(index);
473                 } catch (IOException e) {
474                     // Save this exception and let other threads finish.
475                     finishLock.setException(e);
476                 }
477                 // notify main thread, that this thread has finished, and
478                 // wakeup others, if this thread is the first to finish.
479                 finishLock.threadFinished();
480             }
481         }
482     }
483 
484     // After some channels registered/deregistered, the number of required
485     // helper threads may have changed. Adjust this number.
486     private void adjustThreadsCount() {
threads.size()487         if (threadsCount > threads.size()) {
488             // More threads needed. Start more threads.
489             for (int i = threads.size(); i < threadsCount; i++) {
490                 SelectThread newThread = new SelectThread(i);
491                 threads.add(newThread);
492                 newThread.setDaemon(true);
493                 newThread.start();
494             }
495         } else if (threadsCount < threads.size()) {
496             // Some threads become redundant. Remove them from the threads List.
497             for (int i = threads.size() - 1 ; i >= threadsCount; i--)
498                 threads.remove(i).makeZombie();
499         }
500     }
501 
502     // Sets Windows wakeup socket to a signaled state.
setWakeupSocket()503     private void setWakeupSocket() {
504         setWakeupSocket0(wakeupSinkFd);
505     }
506     private native void setWakeupSocket0(int wakeupSinkFd);
507 
508     // Sets Windows wakeup socket to a non-signaled state.
resetWakeupSocket()509     private void resetWakeupSocket() {
510         synchronized (interruptLock) {
511             if (interruptTriggered == false)
512                 return;
513             resetWakeupSocket0(wakeupSourceFd);
514             interruptTriggered = false;
515         }
516     }
517 
518     private native void resetWakeupSocket0(int wakeupSourceFd);
519 
520     private native boolean discardUrgentData(int fd);
521 
522     // We increment this counter on each call to updateSelectedKeys()
523     // each entry in  SubSelector.fdsMap has a memorized value of
524     // updateCount. When we increment numKeysUpdated we set updateCount
525     // for the corresponding entry to its current value. This is used to
526     // avoid counting the same key more than once - the same key can
527     // appear in readfds and writefds.
528     private long updateCount = 0;
529 
530     // Update ops of the corresponding Channels. Add the ready keys to the
531     // ready queue.
updateSelectedKeys(Consumer<SelectionKey> action)532     private int updateSelectedKeys(Consumer<SelectionKey> action) {
533         updateCount++;
534         int numKeysUpdated = 0;
535         numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
536         for (SelectThread t: threads) {
537             numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
538         }
539         return numKeysUpdated;
540     }
541 
542     @Override
implClose()543     protected void implClose() throws IOException {
544         assert !isOpen();
545         assert Thread.holdsLock(this);
546 
547         // prevent further wakeup
548         synchronized (interruptLock) {
549             interruptTriggered = true;
550         }
551 
552         wakeupPipe.sink().close();
553         wakeupPipe.source().close();
554         pollWrapper.free();
555 
556         // Make all remaining helper threads exit
557         for (SelectThread t: threads)
558              t.makeZombie();
559         startLock.startThreads();
560         subSelector.freeFDSetBuffer();
561     }
562 
563     @Override
implRegister(SelectionKeyImpl ski)564     protected void implRegister(SelectionKeyImpl ski) {
565         ensureOpen();
566         synchronized (updateLock) {
567             newKeys.addLast(ski);
568         }
569     }
570 
growIfNeeded()571     private void growIfNeeded() {
572         if (channelArray.length == totalChannels) {
573             int newSize = totalChannels * 2; // Make a larger array
574             SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
575             System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
576             channelArray = temp;
577             pollWrapper.grow(newSize);
578         }
579         if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
580             pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
581             totalChannels++;
582             threadsCount++;
583         }
584     }
585 
586     @Override
implDereg(SelectionKeyImpl ski)587     protected void implDereg(SelectionKeyImpl ski) {
588         assert !ski.isValid();
589         assert Thread.holdsLock(this);
590 
591         if (fdMap.remove(ski) != null) {
592             int i = ski.getIndex();
593             assert (i >= 0);
594 
595             if (i != totalChannels - 1) {
596                 // Copy end one over it
597                 SelectionKeyImpl endChannel = channelArray[totalChannels-1];
598                 channelArray[i] = endChannel;
599                 endChannel.setIndex(i);
600                 pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
601             }
602             ski.setIndex(-1);
603 
604             channelArray[totalChannels - 1] = null;
605             totalChannels--;
606             if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
607                 totalChannels--;
608                 threadsCount--; // The last thread has become redundant.
609             }
610         }
611     }
612 
613     @Override
setEventOps(SelectionKeyImpl ski)614     public void setEventOps(SelectionKeyImpl ski) {
615         ensureOpen();
616         synchronized (updateLock) {
617             updateKeys.addLast(ski);
618         }
619     }
620 
621     @Override
wakeup()622     public Selector wakeup() {
623         synchronized (interruptLock) {
624             if (!interruptTriggered) {
625                 setWakeupSocket();
626                 interruptTriggered = true;
627             }
628         }
629         return this;
630     }
631 
632     static {
633         IOUtil.load();
634     }
635 }
636