1 /*
2  * Copyright (c) 2001, 2020, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /* @test
25  * @bug 4313882 7183800
26  * @summary Test DatagramChannel's send and receive methods
27  */
28 
29 import java.io.*;
30 import java.net.*;
31 import java.nio.*;
32 import java.nio.channels.*;
33 import java.nio.charset.*;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.CompletionException;
38 import java.util.stream.Stream;
39 
40 public class Connect {
41 
42     static PrintStream log = System.err;
43 
main(String[] args)44     public static void main(String[] args) throws Exception {
45         test();
46     }
47 
test()48     static void test() throws Exception {
49         ExecutorService threadPool = Executors.newCachedThreadPool();
50         try (Responder r = new Responder();
51              Initiator a = new Initiator(r.getSocketAddress())
52         ) {
53             invoke(threadPool, a, r);
54         } finally {
55             threadPool.shutdown();
56         }
57     }
58 
invoke(ExecutorService e, Runnable reader, Runnable writer)59     static void invoke(ExecutorService e, Runnable reader, Runnable writer) throws CompletionException {
60         CompletableFuture<Void> f1 = CompletableFuture.runAsync(writer, e);
61         CompletableFuture<Void> f2 = CompletableFuture.runAsync(reader, e);
62         wait(f1, f2);
63     }
64 
65 
66     // This method waits for either one of the given futures to complete exceptionally
67     // or for all of the given futures to complete successfully.
wait(CompletableFuture<?>.... futures)68     private static void wait(CompletableFuture<?>... futures) throws CompletionException {
69         CompletableFuture<?> future = CompletableFuture.allOf(futures);
70         Stream.of(futures)
71                 .forEach(f -> f.exceptionally(ex -> {
72                     future.completeExceptionally(ex);
73                     return null;
74                 }));
75         future.join();
76     }
77 
toConnectAddress(SocketAddress address)78     private static SocketAddress toConnectAddress(SocketAddress address) {
79         if (address instanceof InetSocketAddress) {
80             var inet = (InetSocketAddress) address;
81             if (inet.getAddress().isAnyLocalAddress()) {
82                 // if the peer is bound to the wildcard address, use
83                 // the loopback address to connect.
84                 var loopback = InetAddress.getLoopbackAddress();
85                 return new InetSocketAddress(loopback, inet.getPort());
86             }
87         }
88         return address;
89     }
90 
91     public static class Initiator implements AutoCloseable, Runnable {
92         final SocketAddress connectSocketAddress;
93         final DatagramChannel dc;
94 
Initiator(SocketAddress peerSocketAddress)95         Initiator(SocketAddress peerSocketAddress) throws IOException {
96             this.connectSocketAddress = toConnectAddress(peerSocketAddress);
97             dc = DatagramChannel.open();
98         }
99 
run()100         public void run() {
101             try {
102                 ByteBuffer bb = ByteBuffer.allocateDirect(256);
103                 bb.put("hello".getBytes());
104                 bb.flip();
105                 log.println("Initiator connecting to " + connectSocketAddress);
106                 dc.connect(connectSocketAddress);
107 
108                 // Send a message
109                 log.println("Initiator attempting to write to Responder at " + connectSocketAddress.toString());
110                 dc.write(bb);
111 
112                 // Try to send to some other address
113                 try {
114                     int port = dc.socket().getLocalPort();
115                     InetAddress loopback = InetAddress.getLoopbackAddress();
116                     InetSocketAddress otherAddress = new InetSocketAddress(loopback, (port == 3333 ? 3332 : 3333));
117                     log.println("Testing if Initiator throws AlreadyConnectedException" + otherAddress.toString());
118                     dc.send(bb, otherAddress);
119                     throw new RuntimeException("Initiator allowed send to other address while already connected");
120                 } catch (AlreadyConnectedException ace) {
121                     // Correct behavior
122                 }
123 
124                 // Read a reply
125                 bb.flip();
126                 log.println("Initiator waiting to read");
127                 dc.read(bb);
128                 bb.flip();
129                 CharBuffer cb = StandardCharsets.US_ASCII.
130                         newDecoder().decode(bb);
131                 log.println("Initiator received from Responder at " + connectSocketAddress + ": " + cb);
132             } catch (Exception ex) {
133                 log.println("Initiator threw exception: " + ex);
134                 throw new RuntimeException(ex);
135             } finally {
136                 log.println("Initiator finished");
137             }
138         }
139 
140         @Override
close()141         public void close() throws IOException {
142             dc.close();
143         }
144     }
145 
146     public static class Responder implements AutoCloseable, Runnable {
147         final DatagramChannel dc;
148 
Responder()149         Responder() throws IOException {
150             var address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
151             dc = DatagramChannel.open().bind(address);
152         }
153 
getSocketAddress()154         SocketAddress getSocketAddress() throws IOException {
155             return dc.getLocalAddress();
156         }
157 
run()158         public void run() {
159             try {
160                 // Listen for a message
161                 ByteBuffer bb = ByteBuffer.allocateDirect(100);
162                 log.println("Responder waiting to receive");
163                 SocketAddress sa = dc.receive(bb);
164                 bb.flip();
165                 CharBuffer cb = StandardCharsets.US_ASCII.
166                         newDecoder().decode(bb);
167                 log.println("Responder received from Initiator at" + sa +  ": " + cb);
168 
169                 // Reply to sender
170                 dc.connect(sa);
171                 bb.flip();
172                 log.println("Responder attempting to write: " + dc.getRemoteAddress().toString());
173                 dc.write(bb);
174             } catch (Exception ex) {
175                 log.println("Responder threw exception: " + ex);
176                 throw new RuntimeException(ex);
177             } finally {
178                 log.println("Responder finished");
179             }
180         }
181 
182         @Override
close()183         public void close() throws IOException {
184             dc.close();
185         }
186     }
187 }
188