1 /*
2  * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.io.IOException;
29 import java.nio.channels.ClosedSelectorException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.nio.channels.spi.SelectorProvider;
33 import java.util.ArrayDeque;
34 import java.util.Deque;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Consumer;
39 
40 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD;
41 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER;
42 import static sun.nio.ch.SolarisEventPort.SIZEOF_PORT_EVENT;
43 import static sun.nio.ch.SolarisEventPort.OFFSETOF_EVENTS;
44 import static sun.nio.ch.SolarisEventPort.OFFSETOF_SOURCE;
45 import static sun.nio.ch.SolarisEventPort.OFFSETOF_OBJECT;
46 import static sun.nio.ch.SolarisEventPort.port_create;
47 import static sun.nio.ch.SolarisEventPort.port_close;
48 import static sun.nio.ch.SolarisEventPort.port_associate;
49 import static sun.nio.ch.SolarisEventPort.port_dissociate;
50 import static sun.nio.ch.SolarisEventPort.port_getn;
51 import static sun.nio.ch.SolarisEventPort.port_send;
52 
53 /**
54  * Selector implementation based on the Solaris event port mechanism.
55  */
56 
57 class EventPortSelectorImpl
58     extends SelectorImpl
59 {
60     // maximum number of events to retrive in one call to port_getn
61     static final int MAX_EVENTS = Math.min(IOUtil.fdLimit()-1, 1024);
62 
63     // port file descriptor
64     private final int pfd;
65 
66     // the poll array (populated by port_getn)
67     private final long pollArrayAddress;
68     private final AllocatedNativeObject pollArray;
69 
70     // maps file descriptor to selection key, synchronize on selector
71     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
72 
73     // the last update operation, incremented by processUpdateQueue
74     private int lastUpdate;
75 
76     // pending new registrations/updates, queued by setEventOps and
77     // updateSelectedKeys
78     private final Object updateLock = new Object();
79     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
80 
81     // interrupt triggering and clearing
82     private final Object interruptLock = new Object();
83     private boolean interruptTriggered;
84 
EventPortSelectorImpl(SelectorProvider sp)85     EventPortSelectorImpl(SelectorProvider sp) throws IOException {
86         super(sp);
87 
88         this.pfd = port_create();
89 
90         int allocationSize = MAX_EVENTS * SIZEOF_PORT_EVENT;
91         this.pollArray = new AllocatedNativeObject(allocationSize, false);
92         this.pollArrayAddress = pollArray.address();
93     }
94 
ensureOpen()95     private void ensureOpen() {
96         if (!isOpen())
97             throw new ClosedSelectorException();
98     }
99 
100     @Override
doSelect(Consumer<SelectionKey> action, long timeout)101     protected int doSelect(Consumer<SelectionKey> action, long timeout)
102         throws IOException
103     {
104         assert Thread.holdsLock(this);
105 
106         long to = timeout;
107         boolean blocking = (to != 0);
108         boolean timedPoll = (to > 0);
109 
110         int numEvents;
111         processUpdateQueue();
112         processDeregisterQueue();
113         try {
114             begin(blocking);
115 
116             do {
117                 long startTime = timedPoll ? System.nanoTime() : 0;
118                 numEvents = port_getn(pfd, pollArrayAddress, MAX_EVENTS, to);
119                 if (numEvents == IOStatus.INTERRUPTED && timedPoll) {
120                     // timed poll interrupted so need to adjust timeout
121                     long adjust = System.nanoTime() - startTime;
122                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
123                     if (to <= 0) {
124                         // timeout also expired so no retry
125                         numEvents = 0;
126                     }
127                 }
128             } while (numEvents == IOStatus.INTERRUPTED);
129             assert IOStatus.check(numEvents);
130 
131         } finally {
132             end(blocking);
133         }
134         processDeregisterQueue();
135         return processPortEvents(numEvents, action);
136     }
137 
138     /**
139      * Process new registrations and changes to the interest ops.
140      */
processUpdateQueue()141     private void processUpdateQueue() throws IOException {
142         assert Thread.holdsLock(this);
143 
144         // bump lastUpdate to ensure that the interest ops are changed at most
145         // once per bulk update
146         lastUpdate++;
147 
148         synchronized (updateLock) {
149             SelectionKeyImpl ski;
150             while ((ski = updateKeys.pollFirst()) != null) {
151                 if (ski.isValid()) {
152                     int fd = ski.getFDVal();
153                     // add to fdToKey if needed
154                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
155                     assert (previous == null) || (previous == ski);
156 
157                     int newEvents = ski.translateInterestOps();
158                     if (newEvents != ski.registeredEvents()) {
159                         if (newEvents == 0) {
160                             port_dissociate(pfd, PORT_SOURCE_FD, fd);
161                         } else {
162                             port_associate(pfd, PORT_SOURCE_FD, fd, newEvents);
163                         }
164                         ski.registeredEvents(newEvents);
165                     }
166                 }
167             }
168         }
169     }
170 
171     /**
172      * Process the polled events and re-queue the selected keys so the file
173      * descriptors are re-associated at the next select operation.
174      */
processPortEvents(int numEvents, Consumer<SelectionKey> action)175     private int processPortEvents(int numEvents, Consumer<SelectionKey> action)
176         throws IOException
177     {
178         assert Thread.holdsLock(this);
179 
180         int numKeysUpdated = 0;
181         boolean interrupted = false;
182 
183         // Process the polled events while holding the update lock. This allows
184         // keys to be queued for ready file descriptors so they can be
185         // re-associated at the next select. The selected-key can be updated
186         // in this pass.
187         synchronized (updateLock) {
188             for (int i = 0; i < numEvents; i++) {
189                 short source = getSource(i);
190                 if (source == PORT_SOURCE_FD) {
191                     int fd = getDescriptor(i);
192                     SelectionKeyImpl ski = fdToKey.get(fd);
193                     if (ski != null) {
194                         ski.registeredEvents(0);
195                         updateKeys.addLast(ski);
196 
197                         // update selected-key set if no action specified
198                         if (action == null) {
199                             int rOps = getEventOps(i);
200                             numKeysUpdated += processReadyEvents(rOps, ski, null);
201                         }
202 
203                     }
204                 } else if (source == PORT_SOURCE_USER) {
205                     interrupted = true;
206                 } else {
207                     assert false;
208                 }
209             }
210         }
211 
212         // if an action specified then iterate over the polled events again so
213         // that the action is performed without holding the update lock.
214         if (action != null) {
215             for (int i = 0; i < numEvents; i++) {
216                 short source = getSource(i);
217                 if (source == PORT_SOURCE_FD) {
218                     int fd = getDescriptor(i);
219                     SelectionKeyImpl ski = fdToKey.get(fd);
220                     if (ski != null) {
221                         int rOps = getEventOps(i);
222                         numKeysUpdated += processReadyEvents(rOps, ski, action);
223                     }
224                 }
225             }
226         }
227 
228         if (interrupted) {
229             clearInterrupt();
230         }
231         return numKeysUpdated;
232     }
233 
234     @Override
implClose()235     protected void implClose() throws IOException {
236         assert !isOpen();
237         assert Thread.holdsLock(this);
238 
239         // prevent further wakeup
240         synchronized (interruptLock) {
241             interruptTriggered = true;
242         }
243 
244         port_close(pfd);
245         pollArray.free();
246     }
247 
248     @Override
implDereg(SelectionKeyImpl ski)249     protected void implDereg(SelectionKeyImpl ski) throws IOException {
250         assert !ski.isValid();
251         assert Thread.holdsLock(this);
252 
253         int fd = ski.getFDVal();
254         if (fdToKey.remove(fd) != null) {
255             if (ski.registeredEvents() != 0) {
256                 port_dissociate(pfd, PORT_SOURCE_FD, fd);
257                 ski.registeredEvents(0);
258             }
259         } else {
260             assert ski.registeredEvents() == 0;
261         }
262     }
263 
264     @Override
setEventOps(SelectionKeyImpl ski)265     public void setEventOps(SelectionKeyImpl ski) {
266         ensureOpen();
267         synchronized (updateLock) {
268             updateKeys.addLast(ski);
269         }
270     }
271 
272     @Override
wakeup()273     public Selector wakeup() {
274         synchronized (interruptLock) {
275             if (!interruptTriggered) {
276                 try {
277                     port_send(pfd, 0);
278                 } catch (IOException ioe) {
279                     throw new InternalError(ioe);
280                 }
281                 interruptTriggered = true;
282             }
283         }
284         return this;
285     }
286 
clearInterrupt()287     private void clearInterrupt() throws IOException {
288         synchronized (interruptLock) {
289             interruptTriggered = false;
290         }
291     }
292 
getSource(int i)293     private short getSource(int i) {
294         int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_SOURCE;
295         return pollArray.getShort(offset);
296     }
297 
getEventOps(int i)298     private int getEventOps(int i) {
299         int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_EVENTS;
300         return pollArray.getInt(offset);
301     }
302 
getDescriptor(int i)303     private int getDescriptor(int i) {
304         //assert Unsafe.getUnsafe().addressSize() == 8;
305         int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_OBJECT;
306         return (int) pollArray.getLong(offset);
307     }
308 }
309