1 /* 2 * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.IOException; 29 import java.nio.channels.ClosedSelectorException; 30 import java.nio.channels.Pipe; 31 import java.nio.channels.SelectionKey; 32 import java.nio.channels.Selector; 33 import java.nio.channels.spi.SelectorProvider; 34 import java.util.ArrayDeque; 35 import java.util.ArrayList; 36 import java.util.Deque; 37 import java.util.HashMap; 38 import java.util.List; 39 import java.util.Map; 40 import java.util.function.Consumer; 41 import jdk.internal.misc.Unsafe; 42 43 /** 44 * A multi-threaded implementation of Selector for Windows. 45 * 46 * @author Konstantin Kladko 47 * @author Mark Reinhold 48 */ 49 50 class WindowsSelectorImpl extends SelectorImpl { 51 private static final Unsafe unsafe = Unsafe.getUnsafe(); 52 private static int addressSize = unsafe.addressSize(); 53 dependsArch(int value32, int value64)54 private static int dependsArch(int value32, int value64) { 55 return (addressSize == 4) ? value32 : value64; 56 } 57 58 // Initial capacity of the poll array 59 private final int INIT_CAP = 8; 60 // Maximum number of sockets for select(). 61 // Should be INIT_CAP times a power of 2 62 private static final int MAX_SELECTABLE_FDS = 1024; 63 64 // Size of FD_SET struct to allocate a buffer for it in SubSelector, 65 // aligned to 8 bytes on 64-bit: 66 // struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }. 67 private static final long SIZEOF_FD_SET = dependsArch( 68 4 + MAX_SELECTABLE_FDS * 4, // SOCKET = unsigned int 69 4 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int64 70 71 // The list of SelectableChannels serviced by this Selector. Every mod 72 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll 73 // array, where the corresponding entry is occupied by the wakeupSocket 74 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; 75 76 // The global native poll array holds file decriptors and event masks 77 private PollArrayWrapper pollWrapper; 78 79 // The number of valid entries in poll array, including entries occupied 80 // by wakeup socket handle. 81 private int totalChannels = 1; 82 83 // Number of helper threads needed for select. We need one thread per 84 // each additional set of MAX_SELECTABLE_FDS - 1 channels. 85 private int threadsCount = 0; 86 87 // A list of helper threads for select. 88 private final List<SelectThread> threads = new ArrayList<SelectThread>(); 89 90 //Pipe used as a wakeup object. 91 private final Pipe wakeupPipe; 92 93 // File descriptors corresponding to source and sink 94 private final int wakeupSourceFd, wakeupSinkFd; 95 96 // Maps file descriptors to their indices in pollArray 97 private static final class FdMap extends HashMap<Integer, MapEntry> { 98 static final long serialVersionUID = 0L; get(int desc)99 private MapEntry get(int desc) { 100 return get(Integer.valueOf(desc)); 101 } put(SelectionKeyImpl ski)102 private MapEntry put(SelectionKeyImpl ski) { 103 return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski)); 104 } remove(SelectionKeyImpl ski)105 private MapEntry remove(SelectionKeyImpl ski) { 106 Integer fd = Integer.valueOf(ski.getFDVal()); 107 MapEntry x = get(fd); 108 if ((x != null) && (x.ski.channel() == ski.channel())) 109 return remove(fd); 110 return null; 111 } 112 } 113 114 // class for fdMap entries 115 private static final class MapEntry { 116 final SelectionKeyImpl ski; 117 long updateCount = 0; MapEntry(SelectionKeyImpl ski)118 MapEntry(SelectionKeyImpl ski) { 119 this.ski = ski; 120 } 121 } 122 private final FdMap fdMap = new FdMap(); 123 124 // SubSelector for the main thread 125 private final SubSelector subSelector = new SubSelector(); 126 127 private long timeout; //timeout for poll 128 129 // Lock for interrupt triggering and clearing 130 private final Object interruptLock = new Object(); 131 private volatile boolean interruptTriggered; 132 133 // pending new registrations/updates, queued by implRegister and setEventOps 134 private final Object updateLock = new Object(); 135 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); 136 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 137 138 WindowsSelectorImpl(SelectorProvider sp)139 WindowsSelectorImpl(SelectorProvider sp) throws IOException { 140 super(sp); 141 pollWrapper = new PollArrayWrapper(INIT_CAP); 142 wakeupPipe = Pipe.open(); 143 wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); 144 145 // Disable the Nagle algorithm so that the wakeup is more immediate 146 SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); 147 (sink.sc).socket().setTcpNoDelay(true); 148 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); 149 150 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); 151 } 152 ensureOpen()153 private void ensureOpen() { 154 if (!isOpen()) 155 throw new ClosedSelectorException(); 156 } 157 158 @Override doSelect(Consumer<SelectionKey> action, long timeout)159 protected int doSelect(Consumer<SelectionKey> action, long timeout) 160 throws IOException 161 { 162 assert Thread.holdsLock(this); 163 this.timeout = timeout; // set selector timeout 164 processUpdateQueue(); 165 processDeregisterQueue(); 166 if (interruptTriggered) { 167 resetWakeupSocket(); 168 return 0; 169 } 170 // Calculate number of helper threads needed for poll. If necessary 171 // threads are created here and start waiting on startLock 172 adjustThreadsCount(); 173 finishLock.reset(); // reset finishLock 174 // Wakeup helper threads, waiting on startLock, so they start polling. 175 // Redundant threads will exit here after wakeup. 176 startLock.startThreads(); 177 // do polling in the main thread. Main thread is responsible for 178 // first MAX_SELECTABLE_FDS entries in pollArray. 179 try { 180 begin(); 181 try { 182 subSelector.poll(); 183 } catch (IOException e) { 184 finishLock.setException(e); // Save this exception 185 } 186 // Main thread is out of poll(). Wakeup others and wait for them 187 if (threads.size() > 0) 188 finishLock.waitForHelperThreads(); 189 } finally { 190 end(); 191 } 192 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 193 finishLock.checkForException(); 194 processDeregisterQueue(); 195 int updated = updateSelectedKeys(action); 196 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 197 resetWakeupSocket(); 198 return updated; 199 } 200 201 /** 202 * Process new registrations and changes to the interest ops. 203 */ processUpdateQueue()204 private void processUpdateQueue() { 205 assert Thread.holdsLock(this); 206 207 synchronized (updateLock) { 208 SelectionKeyImpl ski; 209 210 // new registrations 211 while ((ski = newKeys.pollFirst()) != null) { 212 if (ski.isValid()) { 213 growIfNeeded(); 214 channelArray[totalChannels] = ski; 215 ski.setIndex(totalChannels); 216 pollWrapper.putEntry(totalChannels, ski); 217 totalChannels++; 218 MapEntry previous = fdMap.put(ski); 219 assert previous == null; 220 } 221 } 222 223 // changes to interest ops 224 while ((ski = updateKeys.pollFirst()) != null) { 225 int events = ski.translateInterestOps(); 226 int fd = ski.getFDVal(); 227 if (ski.isValid() && fdMap.containsKey(fd)) { 228 int index = ski.getIndex(); 229 assert index >= 0 && index < totalChannels; 230 pollWrapper.putEventOps(index, events); 231 } 232 } 233 } 234 } 235 236 // Helper threads wait on this lock for the next poll. 237 private final StartLock startLock = new StartLock(); 238 239 private final class StartLock { 240 // A variable which distinguishes the current run of doSelect from the 241 // previous one. Incrementing runsCounter and notifying threads will 242 // trigger another round of poll. 243 private long runsCounter; 244 // Triggers threads, waiting on this lock to start polling. 245 private synchronized void startThreads() { 246 runsCounter++; // next run 247 notifyAll(); // wake up threads. 248 } 249 // This function is called by a helper thread to wait for the 250 // next round of poll(). It also checks, if this thread became 251 // redundant. If yes, it returns true, notifying the thread 252 // that it should exit. 253 private synchronized boolean waitForStart(SelectThread thread) { 254 while (true) { 255 while (runsCounter == thread.lastRun) { 256 try { 257 startLock.wait(); 258 } catch (InterruptedException e) { 259 Thread.currentThread().interrupt(); 260 } 261 } 262 if (thread.isZombie()) { // redundant thread 263 return true; // will cause run() to exit. 264 } else { 265 thread.lastRun = runsCounter; // update lastRun 266 return false; // will cause run() to poll. 267 } 268 } 269 } 270 } 271 272 // Main thread waits on this lock, until all helper threads are done 273 // with poll(). 274 private final FinishLock finishLock = new FinishLock(); 275 276 private final class FinishLock { 277 // Number of helper threads, that did not finish yet. 278 private int threadsToFinish; 279 280 // IOException which occurred during the last run. 281 IOException exception = null; 282 283 // Called before polling. 284 private void reset() { 285 threadsToFinish = threads.size(); // helper threads 286 } 287 288 // Each helper thread invokes this function on finishLock, when 289 // the thread is done with poll(). 290 private synchronized void threadFinished() { 291 if (threadsToFinish == threads.size()) { // finished poll() first 292 // if finished first, wakeup others 293 wakeup(); 294 } 295 threadsToFinish--; 296 if (threadsToFinish == 0) // all helper threads finished poll(). 297 notify(); // notify the main thread 298 } 299 300 // The main thread invokes this function on finishLock to wait 301 // for helper threads to finish poll(). 302 private synchronized void waitForHelperThreads() { 303 if (threadsToFinish == threads.size()) { 304 // no helper threads finished yet. Wakeup them up. 305 wakeup(); 306 } 307 while (threadsToFinish != 0) { 308 try { 309 finishLock.wait(); 310 } catch (InterruptedException e) { 311 // Interrupted - set interrupted state. 312 Thread.currentThread().interrupt(); 313 } 314 } 315 } 316 317 // sets IOException for this run 318 private synchronized void setException(IOException e) { 319 exception = e; 320 } 321 322 // Checks if there was any exception during the last run. 323 // If yes, throws it 324 private void checkForException() throws IOException { 325 if (exception == null) 326 return; 327 StringBuffer message = new StringBuffer("An exception occurred" + 328 " during the execution of select(): \n"); 329 message.append(exception); 330 message.append('\n'); 331 exception = null; 332 throw new IOException(message.toString()); 333 } 334 } 335 336 private final class SubSelector { 337 private final int pollArrayIndex; // starting index in pollArray to poll 338 // These arrays will hold result of native select(). 339 // The first element of each array is the number of selected sockets. 340 // Other elements are file descriptors of selected sockets. 341 private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; 342 private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; 343 private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; 344 // Buffer for readfds, writefds and exceptfds structs that are passed 345 // to native select(). 346 private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 6); 347 348 private SubSelector() { 349 this.pollArrayIndex = 0; // main thread 350 } 351 352 private SubSelector(int threadIndex) { // helper threads 353 this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS; 354 } 355 356 private int poll() throws IOException{ // poll for the main thread 357 return poll0(pollWrapper.pollArrayAddress, 358 Math.min(totalChannels, MAX_SELECTABLE_FDS), 359 readFds, writeFds, exceptFds, timeout, fdsBuffer); 360 } 361 362 private int poll(int index) throws IOException { 363 // poll for helper threads 364 return poll0(pollWrapper.pollArrayAddress + 365 (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), 366 Math.min(MAX_SELECTABLE_FDS, 367 totalChannels - (index + 1) * MAX_SELECTABLE_FDS), 368 readFds, writeFds, exceptFds, timeout, fdsBuffer); 369 } 370 371 private native int poll0(long pollAddress, int numfds, 372 int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer); 373 374 private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) { 375 int numKeysUpdated = 0; 376 numKeysUpdated += processFDSet(updateCount, action, readFds, 377 Net.POLLIN, 378 false); 379 numKeysUpdated += processFDSet(updateCount, action, writeFds, 380 Net.POLLCONN | 381 Net.POLLOUT, 382 false); 383 numKeysUpdated += processFDSet(updateCount, action, exceptFds, 384 Net.POLLIN | 385 Net.POLLCONN | 386 Net.POLLOUT, 387 true); 388 return numKeysUpdated; 389 } 390 391 /** 392 * updateCount is used to tell if a key has been counted as updated 393 * in this select operation. 394 * 395 * me.updateCount <= updateCount 396 */ 397 private int processFDSet(long updateCount, 398 Consumer<SelectionKey> action, 399 int[] fds, int rOps, 400 boolean isExceptFds) 401 { 402 int numKeysUpdated = 0; 403 for (int i = 1; i <= fds[0]; i++) { 404 int desc = fds[i]; 405 if (desc == wakeupSourceFd) { 406 synchronized (interruptLock) { 407 interruptTriggered = true; 408 } 409 continue; 410 } 411 MapEntry me = fdMap.get(desc); 412 // If me is null, the key was deregistered in the previous 413 // processDeregisterQueue. 414 if (me == null) 415 continue; 416 SelectionKeyImpl sk = me.ski; 417 418 // The descriptor may be in the exceptfds set because there is 419 // OOB data queued to the socket. If there is OOB data then it 420 // is discarded and the key is not added to the selected set. 421 if (isExceptFds && 422 (sk.channel() instanceof SocketChannelImpl) && 423 discardUrgentData(desc)) 424 { 425 continue; 426 } 427 428 int updated = processReadyEvents(rOps, sk, action); 429 if (updated > 0 && me.updateCount != updateCount) { 430 me.updateCount = updateCount; 431 numKeysUpdated++; 432 } 433 } 434 return numKeysUpdated; 435 } 436 437 private void freeFDSetBuffer() { 438 unsafe.freeMemory(fdsBuffer); 439 } 440 } 441 442 // Represents a helper thread used for select. 443 private final class SelectThread extends Thread { 444 private final int index; // index of this thread 445 final SubSelector subSelector; 446 private long lastRun = 0; // last run number 447 private volatile boolean zombie; 448 // Creates a new thread 449 private SelectThread(int i) { 450 super(null, null, "SelectorHelper", 0, false); 451 this.index = i; 452 this.subSelector = new SubSelector(i); 453 //make sure we wait for next round of poll 454 this.lastRun = startLock.runsCounter; 455 } 456 void makeZombie() { 457 zombie = true; 458 } 459 boolean isZombie() { 460 return zombie; 461 } 462 public void run() { 463 while (true) { // poll loop 464 // wait for the start of poll. If this thread has become 465 // redundant, then exit. 466 if (startLock.waitForStart(this)) { 467 subSelector.freeFDSetBuffer(); 468 return; 469 } 470 // call poll() 471 try { 472 subSelector.poll(index); 473 } catch (IOException e) { 474 // Save this exception and let other threads finish. 475 finishLock.setException(e); 476 } 477 // notify main thread, that this thread has finished, and 478 // wakeup others, if this thread is the first to finish. 479 finishLock.threadFinished(); 480 } 481 } 482 } 483 484 // After some channels registered/deregistered, the number of required 485 // helper threads may have changed. Adjust this number. 486 private void adjustThreadsCount() { threads.size()487 if (threadsCount > threads.size()) { 488 // More threads needed. Start more threads. 489 for (int i = threads.size(); i < threadsCount; i++) { 490 SelectThread newThread = new SelectThread(i); 491 threads.add(newThread); 492 newThread.setDaemon(true); 493 newThread.start(); 494 } 495 } else if (threadsCount < threads.size()) { 496 // Some threads become redundant. Remove them from the threads List. 497 for (int i = threads.size() - 1 ; i >= threadsCount; i--) 498 threads.remove(i).makeZombie(); 499 } 500 } 501 502 // Sets Windows wakeup socket to a signaled state. setWakeupSocket()503 private void setWakeupSocket() { 504 setWakeupSocket0(wakeupSinkFd); 505 } 506 private native void setWakeupSocket0(int wakeupSinkFd); 507 508 // Sets Windows wakeup socket to a non-signaled state. resetWakeupSocket()509 private void resetWakeupSocket() { 510 synchronized (interruptLock) { 511 if (interruptTriggered == false) 512 return; 513 resetWakeupSocket0(wakeupSourceFd); 514 interruptTriggered = false; 515 } 516 } 517 518 private native void resetWakeupSocket0(int wakeupSourceFd); 519 520 private native boolean discardUrgentData(int fd); 521 522 // We increment this counter on each call to updateSelectedKeys() 523 // each entry in SubSelector.fdsMap has a memorized value of 524 // updateCount. When we increment numKeysUpdated we set updateCount 525 // for the corresponding entry to its current value. This is used to 526 // avoid counting the same key more than once - the same key can 527 // appear in readfds and writefds. 528 private long updateCount = 0; 529 530 // Update ops of the corresponding Channels. Add the ready keys to the 531 // ready queue. updateSelectedKeys(Consumer<SelectionKey> action)532 private int updateSelectedKeys(Consumer<SelectionKey> action) { 533 updateCount++; 534 int numKeysUpdated = 0; 535 numKeysUpdated += subSelector.processSelectedKeys(updateCount, action); 536 for (SelectThread t: threads) { 537 numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action); 538 } 539 return numKeysUpdated; 540 } 541 542 @Override implClose()543 protected void implClose() throws IOException { 544 assert !isOpen(); 545 assert Thread.holdsLock(this); 546 547 // prevent further wakeup 548 synchronized (interruptLock) { 549 interruptTriggered = true; 550 } 551 552 wakeupPipe.sink().close(); 553 wakeupPipe.source().close(); 554 pollWrapper.free(); 555 556 // Make all remaining helper threads exit 557 for (SelectThread t: threads) 558 t.makeZombie(); 559 startLock.startThreads(); 560 subSelector.freeFDSetBuffer(); 561 } 562 563 @Override implRegister(SelectionKeyImpl ski)564 protected void implRegister(SelectionKeyImpl ski) { 565 ensureOpen(); 566 synchronized (updateLock) { 567 newKeys.addLast(ski); 568 } 569 } 570 growIfNeeded()571 private void growIfNeeded() { 572 if (channelArray.length == totalChannels) { 573 int newSize = totalChannels * 2; // Make a larger array 574 SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; 575 System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); 576 channelArray = temp; 577 pollWrapper.grow(newSize); 578 } 579 if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed 580 pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); 581 totalChannels++; 582 threadsCount++; 583 } 584 } 585 586 @Override implDereg(SelectionKeyImpl ski)587 protected void implDereg(SelectionKeyImpl ski) { 588 assert !ski.isValid(); 589 assert Thread.holdsLock(this); 590 591 if (fdMap.remove(ski) != null) { 592 int i = ski.getIndex(); 593 assert (i >= 0); 594 595 if (i != totalChannels - 1) { 596 // Copy end one over it 597 SelectionKeyImpl endChannel = channelArray[totalChannels-1]; 598 channelArray[i] = endChannel; 599 endChannel.setIndex(i); 600 pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i); 601 } 602 ski.setIndex(-1); 603 604 channelArray[totalChannels - 1] = null; 605 totalChannels--; 606 if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { 607 totalChannels--; 608 threadsCount--; // The last thread has become redundant. 609 } 610 } 611 } 612 613 @Override setEventOps(SelectionKeyImpl ski)614 public void setEventOps(SelectionKeyImpl ski) { 615 ensureOpen(); 616 synchronized (updateLock) { 617 updateKeys.addLast(ski); 618 } 619 } 620 621 @Override wakeup()622 public Selector wakeup() { 623 synchronized (interruptLock) { 624 if (!interruptTriggered) { 625 setWakeupSocket(); 626 interruptTriggered = true; 627 } 628 } 629 return this; 630 } 631 632 static { 633 IOUtil.load(); 634 } 635 } 636