1 /*
2  * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /*
25  * @test
26  * @bug 4636628
27  * @summary HttpURLConnection duplicates HTTP GET requests when used with multiple threads
28 */
29 
30 /*
31  * This tests keep-alive behavior using chunkedinputstreams
32  * It checks that keep-alive connections are used and also
33  * that requests are not being repeated (due to errors)
34  *
35  * It also checks that the keepalive connections are closed eventually
36  * because the test will not terminate if the connections
37  * are not closed by the keep-alive timer.
38  */
39 
40 import java.net.*;
41 import java.io.*;
42 import java.time.Duration;
43 import java.util.Queue;
44 import java.util.concurrent.ConcurrentLinkedQueue;
45 
46 public class MultiThreadTest extends Thread {
47 
48     /*
49      * Is debugging enabled - start with -d to enable.
50      */
51     static boolean debug = true; // disable debug once stability proven
52 
53     static Object threadlock = new Object ();
54     static int threadCounter = 0;
55 
getLock()56     static Object getLock() { return threadlock; }
57 
debug(String msg)58     static void debug(String msg) {
59         if (debug)
60             System.out.println(msg);
61     }
62 
63     static int reqnum = 0;
64 
doRequest(String uri)65     void doRequest(String uri) throws Exception {
66         URL url = new URL(uri + "?foo="+reqnum);
67         reqnum ++;
68         HttpURLConnection http = (HttpURLConnection)url.openConnection();
69 
70         InputStream in = http.getInputStream();
71         byte b[] = new byte[100];
72         int total = 0;
73         int n;
74         do {
75             n = in.read(b);
76             if (n > 0) total += n;
77         } while (n > 0);
78         debug ("client: read " + total + " bytes");
79         in.close();
80         http.disconnect();
81     }
82 
83     String uri;
84     byte[] b;
85     int requests;
86 
MultiThreadTest(int port, int requests)87     MultiThreadTest(int port, int requests) throws Exception {
88         uri = "http://localhost:" + port + "/foo.html";
89 
90         b = new byte [256];
91         this.requests = requests;
92 
93         synchronized (threadlock) {
94             threadCounter ++;
95         }
96     }
97 
run()98     public void run() {
99         long start = System.nanoTime();
100 
101         try {
102             for (int i=0; i<requests; i++) {
103                 doRequest (uri);
104             }
105         } catch (Exception e) {
106             throw new RuntimeException (e.getMessage());
107         } finally {
108             synchronized (threadlock) {
109                 threadCounter --;
110                 if (threadCounter == 0) {
111                     threadlock.notifyAll();
112                 }
113             }
114         }
115         debug("client: end - " + Duration.ofNanos(System.nanoTime() - start));
116     }
117 
118     static int threads=5;
119 
main(String args[])120     public static void main(String args[]) throws Exception {
121         long start = System.nanoTime();
122 
123         int x = 0, arg_len = args.length;
124         int requests = 20;
125 
126         if (arg_len > 0 && args[0].equals("-d")) {
127             debug = true;
128             x = 1;
129             arg_len --;
130         }
131         if (arg_len > 0) {
132             threads = Integer.parseInt (args[x]);
133             requests = Integer.parseInt (args[x+1]);
134         }
135 
136         /* start the server */
137         ServerSocket ss = new ServerSocket(0);
138         Server svr = new Server(ss);
139         svr.start();
140 
141         Object lock = MultiThreadTest.getLock();
142         synchronized (lock) {
143             for (int i=0; i<threads; i++) {
144                 MultiThreadTest t = new MultiThreadTest(ss.getLocalPort(), requests);
145                 t.start ();
146             }
147             try {
148                 lock.wait();
149             } catch (InterruptedException e) {}
150         }
151 
152         // shutdown server - we're done.
153         svr.shutdown();
154 
155         int cnt = svr.connectionCount();
156         MultiThreadTest.debug("Connections = " + cnt);
157         int reqs = Worker.getRequests ();
158         MultiThreadTest.debug("Requests = " + reqs);
159         System.out.println ("Connection count = " + cnt + " Request count = " + reqs);
160         if (cnt > threads) { // could be less
161             throw new RuntimeException ("Expected "+threads + " connections: used " +cnt);
162         }
163         if  (reqs != threads*requests) {
164             throw new RuntimeException ("Expected "+ threads*requests+ " requests: got " +reqs);
165         }
166         for (Thread worker : svr.workers()) {
167             worker.join(60_000);
168         }
169 
170         debug("main thread end - " + Duration.ofNanos(System.nanoTime() - start));
171     }
172 }
173 
174     /*
175      * Server thread to accept connection and create worker threads
176      * to service each connection.
177      */
178     class Server extends Thread {
179         ServerSocket ss;
180         int connectionCount;
181         boolean shutdown = false;
182         private Queue<Worker> workers = new ConcurrentLinkedQueue<>();
183 
Server(ServerSocket ss)184         Server(ServerSocket ss) {
185             this.ss = ss;
186         }
187 
workers()188         public Queue<Worker> workers() {
189             return workers;
190         }
191 
connectionCount()192         public synchronized int connectionCount() {
193             return connectionCount;
194         }
195 
shutdown()196         public synchronized void shutdown() {
197             shutdown = true;
198         }
199 
run()200         public void run() {
201             try {
202                 ss.setSoTimeout(2000);
203 
204                 for (;;) {
205                     Socket s;
206                     try {
207                         MultiThreadTest.debug("server: calling accept.");
208                         s = ss.accept();
209                         MultiThreadTest.debug("server: return accept.");
210                     } catch (SocketTimeoutException te) {
211                         MultiThreadTest.debug("server: STE");
212                         synchronized (this) {
213                             if (shutdown) {
214                                 MultiThreadTest.debug("server: Shuting down.");
215                                 return;
216                             }
217                         }
218                         continue;
219                     }
220 
221                     int id;
222                     Worker w;
223                     synchronized (this) {
224                         id = connectionCount++;
225                         w = new Worker(s, id);
226                         workers.add(w);
227                     }
228                     w.start();
229                     MultiThreadTest.debug("server: Started worker " + id);
230                 }
231 
232             } catch (Exception e) {
233                 e.printStackTrace();
234             } finally {
235                 try {
236                     ss.close();
237                 } catch (Exception e) { }
238             }
239         }
240     }
241 
242     /*
243      * Worker thread to service single connection - can service
244      * multiple http requests on same connection.
245      */
246     class Worker extends Thread {
247         Socket s;
248         int id;
249 
Worker(Socket s, int id)250         Worker(Socket s, int id) {
251             this.s = s;
252             this.id = id;
253         }
254 
255         static int requests = 0;
256         static final Object rlock = new Object();
257 
getRequests()258         public static int getRequests () {
259             synchronized (rlock) {
260                 return requests;
261             }
262         }
incRequests()263         public static void incRequests () {
264             synchronized (rlock) {
265                 requests++;
266             }
267         }
268 
readUntil(InputStream in, char[] seq)269         int readUntil(InputStream in, char[] seq) throws IOException {
270             int i=0, count=0;
271             while (true) {
272                 int c = in.read();
273                 if (c == -1)
274                     return -1;
275                 count++;
276                 if (c == seq[i]) {
277                     i++;
278                     if (i == seq.length)
279                         return count;
280                     continue;
281                 } else {
282                     i = 0;
283                 }
284             }
285         }
286 
run()287         public void run() {
288             long start = System.nanoTime();
289 
290             try {
291                 int max = 400;
292                 byte b[] = new byte[1000];
293                 InputStream in = new BufferedInputStream(s.getInputStream());
294                 // response to client
295                 PrintStream out = new PrintStream(
296                                     new BufferedOutputStream(
297                                                 s.getOutputStream() ));
298 
299                 for (;;) {
300 
301                     // read entire request from client
302                     int n=0;
303 
304                     n = readUntil(in, new char[] {'\r','\n', '\r','\n'});
305 
306                     if (n <= 0) {
307                         MultiThreadTest.debug("worker: " + id + ": Shutdown");
308                         s.close();
309                         return;
310                     }
311 
312                     MultiThreadTest.debug("worker " + id +
313                         ": Read request from client " +
314                         "(" + n + " bytes).");
315 
316                     incRequests();
317                     out.print("HTTP/1.1 200 OK\r\n");
318                     out.print("Transfer-Encoding: chunked\r\n");
319                     out.print("Content-Type: text/html\r\n");
320                     out.print("Connection: Keep-Alive\r\n");
321                     out.print("Keep-Alive: timeout=15, max="+max+"\r\n");
322                     out.print("\r\n");
323                     out.print("6\r\nHello \r\n");
324                     out.print("5\r\nWorld\r\n");
325                     out.print("0\r\n\r\n");
326                     out.flush();
327 
328                     if (--max == 0) {
329                         s.close();
330                         return;
331                     }
332                 }
333 
334             } catch (Exception e) {
335                 e.printStackTrace();
336             } finally {
337                 try {
338                     s.close();
339                 } catch (Exception e) { }
340                 MultiThreadTest.debug("worker: " + id  + " end - " +
341                             Duration.ofNanos(System.nanoTime() - start));
342             }
343         }
344     }
345