1 /*
2  * Copyright 2009, 2019, Google Inc.  All Rights Reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.nio.channels.ServerSocketChannel;
30 import java.nio.channels.SocketChannel;
31 import java.util.ArrayList;
32 import java.util.Iterator;
33 import java.util.List;
34 
35 /**
36  * Reproduces O(N^2) behavior of JDK6/7 select() call. This happens when
37  * a selector has many unprocessed updates to its interest set (e.g. adding
38  * OP_READ on a bunch of newly accepted sockets). The O(N^2) is triggered
39  * by cancelling a number of selection keys (or just closing a few sockets).
40  * In this case, select() will first go through the list of cancelled keys
41  * and try to deregister them. That deregistration is O(N^2) over the list
42  * of unprocessed updates to the interest set.
43  *
44  * <p> This O(N^2) behavior is a BUG in JVM and should be fixed.
45  *
46  * <p> The test first creates initCount connections, and adds them
47  * to the server epoll set. It then creates massCount connections,
48  * registers interest (causing updateList to be populated with massCount*2
49  * elements), but does not add them to epoll set (that would've cleared
50  * updateList). The test then closes initCount connections, thus populating
51  * deregistration queue. The subsequent call to selectNow() will first process
52  * deregistration queue, performing O(N^2) over updateList size,
53  * equal to massCount * 2.
54  *
55  * <p> Note that connect rate is artificially slowed down to compensate
56  * for what I believe is a Linux bug, where too high of a connection rate
57  * ends up in SYN's being dropped and then slow retransmits.
58  *
59  * @author Igor Chernyshev
60  */
61 public class LotsOfCancels {
62 
63     static long testStartTime;
64 
main(String[] args)65     public static void main(String[] args) throws Exception {
66         // the final select should run in less than 1000ms.
67         runTest(500, 2700, 1000);
68     }
69 
log(String msg)70     static void log(String msg) {
71         System.out.println(getLogPrefix() + msg);
72     }
73 
getLogPrefix()74     static String getLogPrefix() {
75         return durationMillis(testStartTime) + ": ";
76     }
77 
78     /**
79      * Returns the elapsed time since startNanos, in milliseconds.
80      * @param startNanos the start time; this must be a value returned
81      * by {@link System.nanoTime}
82      */
durationMillis(long startNanos)83     static long durationMillis(long startNanos) {
84         return (System.nanoTime() - startNanos) / (1000L * 1000L);
85     }
86 
runTest(int initCount, int massCount, int maxSelectTime)87     static void runTest(int initCount, int massCount, int maxSelectTime)
88             throws Exception {
89         testStartTime = System.nanoTime();
90 
91         InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 7359);
92 
93         // Create server channel, add it to selector and run epoll_ctl.
94         log("Setting up server");
95         Selector serverSelector = Selector.open();
96         ServerSocketChannel server = ServerSocketChannel.open();
97         server.configureBlocking(false);
98         server.socket().bind(address, 5000);
99         server.register(serverSelector, SelectionKey.OP_ACCEPT);
100         serverSelector.selectNow();
101 
102         log("Setting up client");
103         ClientThread client = new ClientThread(address);
104         client.start();
105         Thread.sleep(100);
106 
107         // Set up initial set of client sockets.
108         log("Starting initial client connections");
109         client.connectClients(initCount);
110         Thread.sleep(500);  // Wait for client connections to arrive
111 
112         // Accept all initial client sockets, add to selector and run
113         // epoll_ctl.
114         log("Accepting initial connections");
115         List<SocketChannel> serverChannels1 =
116             acceptAndAddAll(serverSelector, server, initCount);
117         if (serverChannels1.size() != initCount) {
118             throw new Exception("Accepted " + serverChannels1.size() +
119                                 " instead of " + initCount);
120         }
121         serverSelector.selectNow();
122 
123         // Set up mass set of client sockets.
124         log("Requesting mass client connections");
125         client.connectClients(massCount);
126         Thread.sleep(500);  // Wait for client connections to arrive
127 
128         // Accept all mass client sockets, add to selector and do NOT
129         // run epoll_ctl.
130         log("Accepting mass connections");
131         List<SocketChannel> serverChannels2 =
132             acceptAndAddAll(serverSelector, server, massCount);
133         if (serverChannels2.size() != massCount) {
134             throw new Exception("Accepted " + serverChannels2.size() +
135                                 " instead of " + massCount);
136         }
137 
138         // Close initial set of sockets.
139         log("Closing initial connections");
140         closeAll(serverChannels1);
141 
142         // Now get the timing of select() call.
143         log("Running the final select call");
144         long startTime = System.nanoTime();
145         serverSelector.selectNow();
146         long duration = durationMillis(startTime);
147         log("Init count = " + initCount +
148             ", mass count = " + massCount +
149             ", duration = " + duration + "ms");
150 
151         if (duration > maxSelectTime) {
152             System.out.println
153                 ("\n\n\n\n\nFAILURE: The final selectNow() took " +
154                  duration + "ms " +
155                  "- seems like O(N^2) bug is still here\n\n");
156             System.exit(1);
157         }
158     }
159 
acceptAndAddAll(Selector selector, ServerSocketChannel server, int expected)160     static List<SocketChannel> acceptAndAddAll(Selector selector,
161                                                ServerSocketChannel server,
162                                                int expected)
163             throws Exception {
164         int retryCount = 0;
165         int acceptCount = 0;
166         List<SocketChannel> channels = new ArrayList<SocketChannel>();
167         while (channels.size() < expected) {
168             SocketChannel channel = server.accept();
169             if (channel == null) {
170                 log("accept() returned null " +
171                     "after accepting " + acceptCount + " more connections");
172                 acceptCount = 0;
173                 if (retryCount < 10) {
174                     // See if more new sockets got stacked behind.
175                     retryCount++;
176                     Thread.sleep(500);
177                     continue;
178                 }
179                 break;
180             }
181             retryCount = 0;
182             acceptCount++;
183             channel.configureBlocking(false);
184             channel.register(selector, SelectionKey.OP_READ);
185             channels.add(channel);
186         }
187         // Cause an additional updateList entry per channel.
188         for (SocketChannel channel : channels) {
189             channel.register(selector, SelectionKey.OP_WRITE);
190         }
191         return channels;
192     }
193 
closeAll(List<SocketChannel> channels)194     static void closeAll(List<SocketChannel> channels)
195             throws Exception {
196         for (SocketChannel channel : channels) {
197             channel.close();
198         }
199     }
200 
201     static class ClientThread extends Thread {
202         private final SocketAddress address;
203         private final Selector selector;
204         private int connectionsNeeded;
205         private int totalCreated;
206 
ClientThread(SocketAddress address)207         ClientThread(SocketAddress address) throws Exception {
208             this.address = address;
209             selector = Selector.open();
210             setDaemon(true);
211         }
212 
connectClients(int count)213         void connectClients(int count) throws Exception {
214             synchronized (this) {
215                 connectionsNeeded += count;
216             }
217             selector.wakeup();
218         }
219 
220         @Override
run()221         public void run() {
222             try {
223                 handleClients();
224             } catch (Throwable e) {
225                 e.printStackTrace();
226                 System.exit(1);
227             }
228         }
229 
handleClients()230         private void handleClients() throws Exception {
231             int selectCount = 0;
232             while (true) {
233                 int createdCount = 0;
234                 synchronized (this) {
235                     if (connectionsNeeded > 0) {
236 
237                         while (connectionsNeeded > 0 && createdCount < 20) {
238                             connectionsNeeded--;
239                             createdCount++;
240                             totalCreated++;
241 
242                             SocketChannel channel = SocketChannel.open();
243                             channel.configureBlocking(false);
244                             channel.connect(address);
245                             if (!channel.finishConnect()) {
246                                 channel.register(selector,
247                                                  SelectionKey.OP_CONNECT);
248                             }
249                         }
250 
251                         log("Started total of " +
252                             totalCreated + " client connections");
253                         Thread.sleep(200);
254                     }
255                 }
256 
257                 if (createdCount > 0) {
258                     selector.selectNow();
259                 } else {
260                     selectCount++;
261                     long startTime = System.nanoTime();
262                     selector.select();
263                     long duration = durationMillis(startTime);
264                     log("Exited clientSelector.select(), loop #"
265                         + selectCount + ", duration = " + duration + "ms");
266                 }
267 
268                 int keyCount = -1;
269                 Iterator<SelectionKey> keys =
270                     selector.selectedKeys().iterator();
271                 while (keys.hasNext()) {
272                     SelectionKey key = keys.next();
273                     synchronized (key) {
274                         keyCount++;
275                         keys.remove();
276                         if (!key.isValid()) {
277                             log("Ignoring client key #" + keyCount);
278                             continue;
279                         }
280                         int readyOps = key.readyOps();
281                         if (readyOps == SelectionKey.OP_CONNECT) {
282                             key.interestOps(0);
283                             ((SocketChannel) key.channel()).finishConnect();
284                         } else {
285                             log("readyOps() on client key #" + keyCount +
286                                 " returned " + readyOps);
287                         }
288                     }
289                 }
290             }
291         }
292     }
293 }
294