1 /* 2 * Copyright (c) 2001, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4313882 7183800 26 * @summary Test DatagramChannel's send and receive methods 27 */ 28 29 import java.io.*; 30 import java.net.*; 31 import java.nio.*; 32 import java.nio.channels.*; 33 import java.nio.charset.*; 34 import java.util.concurrent.ExecutorService; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.CompletionException; 38 import java.util.stream.Stream; 39 40 public class Connect { 41 42 static PrintStream log = System.err; 43 main(String[] args)44 public static void main(String[] args) throws Exception { 45 test(); 46 } 47 test()48 static void test() throws Exception { 49 ExecutorService threadPool = Executors.newCachedThreadPool(); 50 try (Responder r = new Responder(); 51 Initiator a = new Initiator(r.getSocketAddress()) 52 ) { 53 invoke(threadPool, a, r); 54 } finally { 55 threadPool.shutdown(); 56 } 57 } 58 invoke(ExecutorService e, Runnable reader, Runnable writer)59 static void invoke(ExecutorService e, Runnable reader, Runnable writer) throws CompletionException { 60 CompletableFuture<Void> f1 = CompletableFuture.runAsync(writer, e); 61 CompletableFuture<Void> f2 = CompletableFuture.runAsync(reader, e); 62 wait(f1, f2); 63 } 64 65 66 // This method waits for either one of the given futures to complete exceptionally 67 // or for all of the given futures to complete successfully. wait(CompletableFuture<?>.... futures)68 private static void wait(CompletableFuture<?>... futures) throws CompletionException { 69 CompletableFuture<?> future = CompletableFuture.allOf(futures); 70 Stream.of(futures) 71 .forEach(f -> f.exceptionally(ex -> { 72 future.completeExceptionally(ex); 73 return null; 74 })); 75 future.join(); 76 } 77 toConnectAddress(SocketAddress address)78 private static SocketAddress toConnectAddress(SocketAddress address) { 79 if (address instanceof InetSocketAddress) { 80 var inet = (InetSocketAddress) address; 81 if (inet.getAddress().isAnyLocalAddress()) { 82 // if the peer is bound to the wildcard address, use 83 // the loopback address to connect. 84 var loopback = InetAddress.getLoopbackAddress(); 85 return new InetSocketAddress(loopback, inet.getPort()); 86 } 87 } 88 return address; 89 } 90 91 public static class Initiator implements AutoCloseable, Runnable { 92 final SocketAddress connectSocketAddress; 93 final DatagramChannel dc; 94 Initiator(SocketAddress peerSocketAddress)95 Initiator(SocketAddress peerSocketAddress) throws IOException { 96 this.connectSocketAddress = toConnectAddress(peerSocketAddress); 97 dc = DatagramChannel.open(); 98 } 99 run()100 public void run() { 101 try { 102 ByteBuffer bb = ByteBuffer.allocateDirect(256); 103 bb.put("hello".getBytes()); 104 bb.flip(); 105 log.println("Initiator connecting to " + connectSocketAddress); 106 dc.connect(connectSocketAddress); 107 108 // Send a message 109 log.println("Initiator attempting to write to Responder at " + connectSocketAddress.toString()); 110 dc.write(bb); 111 112 // Try to send to some other address 113 try { 114 int port = dc.socket().getLocalPort(); 115 InetAddress loopback = InetAddress.getLoopbackAddress(); 116 InetSocketAddress otherAddress = new InetSocketAddress(loopback, (port == 3333 ? 3332 : 3333)); 117 log.println("Testing if Initiator throws AlreadyConnectedException" + otherAddress.toString()); 118 dc.send(bb, otherAddress); 119 throw new RuntimeException("Initiator allowed send to other address while already connected"); 120 } catch (AlreadyConnectedException ace) { 121 // Correct behavior 122 } 123 124 // Read a reply 125 bb.flip(); 126 log.println("Initiator waiting to read"); 127 dc.read(bb); 128 bb.flip(); 129 CharBuffer cb = StandardCharsets.US_ASCII. 130 newDecoder().decode(bb); 131 log.println("Initiator received from Responder at " + connectSocketAddress + ": " + cb); 132 } catch (Exception ex) { 133 log.println("Initiator threw exception: " + ex); 134 throw new RuntimeException(ex); 135 } finally { 136 log.println("Initiator finished"); 137 } 138 } 139 140 @Override close()141 public void close() throws IOException { 142 dc.close(); 143 } 144 } 145 146 public static class Responder implements AutoCloseable, Runnable { 147 final DatagramChannel dc; 148 Responder()149 Responder() throws IOException { 150 var address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); 151 dc = DatagramChannel.open().bind(address); 152 } 153 getSocketAddress()154 SocketAddress getSocketAddress() throws IOException { 155 return dc.getLocalAddress(); 156 } 157 run()158 public void run() { 159 try { 160 // Listen for a message 161 ByteBuffer bb = ByteBuffer.allocateDirect(100); 162 log.println("Responder waiting to receive"); 163 SocketAddress sa = dc.receive(bb); 164 bb.flip(); 165 CharBuffer cb = StandardCharsets.US_ASCII. 166 newDecoder().decode(bb); 167 log.println("Responder received from Initiator at" + sa + ": " + cb); 168 169 // Reply to sender 170 dc.connect(sa); 171 bb.flip(); 172 log.println("Responder attempting to write: " + dc.getRemoteAddress().toString()); 173 dc.write(bb); 174 } catch (Exception ex) { 175 log.println("Responder threw exception: " + ex); 176 throw new RuntimeException(ex); 177 } finally { 178 log.println("Responder finished"); 179 } 180 } 181 182 @Override close()183 public void close() throws IOException { 184 dc.close(); 185 } 186 } 187 } 188