1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper.server.util; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.OutputStream; 24 import java.net.ConnectException; 25 import java.net.ServerSocket; 26 import java.net.Socket; 27 import java.net.SocketException; 28 import java.net.SocketTimeoutException; 29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.Executors; 33 import java.util.concurrent.TimeUnit; 34 import org.slf4j.Logger; 35 import org.slf4j.LoggerFactory; 36 37 /** 38 * A utility that does bi-directional forwarding between two ports. 39 * Useful, for example, to simulate network failures. 40 * Example: 41 * 42 * Server 1 config file: 43 * 44 * server.1=127.0.0.1:7301:7401;8201 45 * server.2=127.0.0.1:7302:7402;8202 46 * server.3=127.0.0.1:7303:7403;8203 47 * 48 * Server 2 and 3 config files: 49 * 50 * server.1=127.0.0.1:8301:8401;8201 51 * server.2=127.0.0.1:8302:8402;8202 52 * server.3=127.0.0.1:8303:8403;8203 53 * 54 * Initially forward traffic between 730x and 830x and between 740x and 830x 55 * This way server 1 can communicate with servers 2 and 3 56 * .... 57 * 58 * List<PortForwarder> pfs = startForwarding(); 59 * .... 60 * // simulate a network interruption for server 1 61 * stopForwarding(pfs); 62 * .... 63 * // restore connection 64 * pfs = startForwarding(); 65 * 66 * 67 * private List<PortForwarder> startForwarding() throws IOException { 68 * List<PortForwarder> res = new ArrayList<PortForwarder>(); 69 * res.add(new PortForwarder(8301, 7301)); 70 * res.add(new PortForwarder(8401, 7401)); 71 * res.add(new PortForwarder(7302, 8302)); 72 * res.add(new PortForwarder(7402, 8402)); 73 * res.add(new PortForwarder(7303, 8303)); 74 * res.add(new PortForwarder(7403, 8403)); 75 * return res; 76 * } 77 * 78 * private void stopForwarding(List<PortForwarder> pfs) throws Exception { 79 * for (PortForwarder pf : pfs) { 80 * pf.shutdown(); 81 * } 82 * } 83 * 84 * 85 */ 86 public class PortForwarder extends Thread { 87 88 private static final Logger LOG = LoggerFactory.getLogger(PortForwarder.class); 89 90 private static class PortForwardWorker implements Runnable { 91 92 private final InputStream in; 93 private final OutputStream out; 94 private final Socket toClose; 95 private final Socket toClose2; 96 private boolean isFinished = false; 97 PortForwardWorker(Socket toClose, Socket toClose2, InputStream in, OutputStream out)98 PortForwardWorker(Socket toClose, Socket toClose2, InputStream in, OutputStream out) { 99 this.toClose = toClose; 100 this.toClose2 = toClose2; 101 this.in = in; 102 this.out = out; 103 // LOG.info("starting forward for "+toClose); 104 } 105 run()106 public void run() { 107 Thread.currentThread().setName(toClose.toString() + "-->" + toClose2.toString()); 108 byte[] buf = new byte[1024]; 109 try { 110 while (true) { 111 try { 112 int read = this.in.read(buf); 113 if (read > 0) { 114 try { 115 this.out.write(buf, 0, read); 116 } catch (IOException e) { 117 LOG.warn("exception during write", e); 118 break; 119 } 120 } else if (read < 0) { 121 throw new IOException("read " + read); 122 } 123 } catch (SocketTimeoutException e) { 124 LOG.error("socket timeout", e); 125 } 126 } 127 Thread.sleep(1); 128 } catch (InterruptedException e) { 129 LOG.warn("Interrupted", e); 130 } catch (SocketException e) { 131 if (!"Socket closed".equals(e.getMessage())) { 132 LOG.error("Unexpected exception", e); 133 } 134 } catch (IOException e) { 135 LOG.error("Unexpected exception", e); 136 } finally { 137 shutdown(); 138 } 139 LOG.info("Shutting down forward for {}", toClose); 140 isFinished = true; 141 } 142 waitForShutdown(long timeoutMs)143 boolean waitForShutdown(long timeoutMs) throws InterruptedException { 144 synchronized (this) { 145 if (!isFinished) { 146 this.wait(timeoutMs); 147 } 148 } 149 return isFinished; 150 } 151 shutdown()152 public void shutdown() { 153 try { 154 toClose.close(); 155 } catch (IOException ex) { 156 // ignore 157 } 158 try { 159 toClose2.close(); 160 } catch (IOException ex) { 161 // ignore silently 162 } 163 } 164 165 } 166 167 private volatile boolean stopped = false; 168 private ExecutorService workerExecutor = Executors.newCachedThreadPool(); 169 private List<PortForwardWorker> workers = new ArrayList<>(); 170 private ServerSocket serverSocket; 171 private final int to; 172 PortForwarder(int from, int to)173 public PortForwarder(int from, int to) throws IOException { 174 this.to = to; 175 serverSocket = new ServerSocket(from); 176 serverSocket.setSoTimeout(30000); 177 this.start(); 178 } 179 180 @Override run()181 public void run() { 182 try { 183 while (!stopped) { 184 Socket sock = null; 185 try { 186 LOG.info("accepting socket local:{} to:{}", serverSocket.getLocalPort(), to); 187 sock = serverSocket.accept(); 188 LOG.info("accepted: local:{} from:{} to:{}", sock.getLocalPort(), sock.getPort(), to); 189 Socket target = null; 190 int retry = 10; 191 while (sock.isConnected()) { 192 try { 193 target = new Socket("localhost", to); 194 break; 195 } catch (IOException e) { 196 if (retry == 0) { 197 throw e; 198 } 199 LOG.warn( 200 "connection failed, retrying({}): local:{} from:{} to:{}", 201 retry, 202 sock.getLocalPort(), 203 sock.getPort(), 204 to, 205 e); 206 } 207 Thread.sleep(TimeUnit.SECONDS.toMillis(1)); 208 retry--; 209 } 210 LOG.info("connected: local:{} from:{} to:{}", sock.getLocalPort(), sock.getPort(), to); 211 sock.setSoTimeout(30000); 212 target.setSoTimeout(30000); 213 214 workers.add(new PortForwardWorker(sock, target, sock.getInputStream(), target.getOutputStream())); 215 workers.add(new PortForwardWorker(target, sock, target.getInputStream(), sock.getOutputStream())); 216 for (PortForwardWorker worker : workers) { 217 workerExecutor.submit(worker); 218 } 219 } catch (SocketTimeoutException e) { 220 LOG.warn("socket timed out", e); 221 } catch (ConnectException e) { 222 LOG.warn( 223 "connection exception local:{} from:{} to:{}", 224 sock.getLocalPort(), 225 sock.getPort(), 226 to, 227 e); 228 sock.close(); 229 } catch (IOException e) { 230 if (!"Socket closed".equals(e.getMessage())) { 231 LOG.warn( 232 "unexpected exception local:{} from:{} to:{}", 233 sock.getLocalPort(), 234 sock.getPort(), 235 to, 236 e); 237 throw e; 238 } 239 } 240 241 } 242 } catch (IOException e) { 243 LOG.error("Unexpected exception to:{}", to, e); 244 } catch (InterruptedException e) { 245 LOG.error("Interrupted to:{}", to, e); 246 } 247 } 248 shutdown()249 public void shutdown() throws Exception { 250 this.stopped = true; 251 this.serverSocket.close(); 252 this.join(); 253 this.workerExecutor.shutdownNow(); 254 for (PortForwardWorker worker : workers) { 255 worker.shutdown(); 256 } 257 258 for (PortForwardWorker worker : workers) { 259 if (!worker.waitForShutdown(5000)) { 260 throw new Exception("Failed to stop forwarding within 5 seconds"); 261 } 262 } 263 } 264 265 } 266