1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.server.util;
20 
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.net.ConnectException;
25 import java.net.ServerSocket;
26 import java.net.Socket;
27 import java.net.SocketException;
28 import java.net.SocketTimeoutException;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.TimeUnit;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 
37 /**
38  * A utility that does bi-directional forwarding between two ports.
39  * Useful, for example, to simulate network failures.
40  * Example:
41  *
42  *   Server 1 config file:
43  *
44  *      server.1=127.0.0.1:7301:7401;8201
45  *      server.2=127.0.0.1:7302:7402;8202
46  *      server.3=127.0.0.1:7303:7403;8203
47  *
48  *   Server 2 and 3 config files:
49  *
50  *      server.1=127.0.0.1:8301:8401;8201
51  *      server.2=127.0.0.1:8302:8402;8202
52  *      server.3=127.0.0.1:8303:8403;8203
53  *
54  *   Initially forward traffic between 730x and 830x and between 740x and 830x
55  *   This way server 1 can communicate with servers 2 and 3
56  *  ....
57  *
58  *   List<PortForwarder> pfs = startForwarding();
59  *  ....
60  *   // simulate a network interruption for server 1
61  *   stopForwarding(pfs);
62  *  ....
63  *   // restore connection
64  *   pfs = startForwarding();
65  *
66  *
67  *  private List<PortForwarder> startForwarding() throws IOException {
68  *      List<PortForwarder> res = new ArrayList<PortForwarder>();
69  *      res.add(new PortForwarder(8301, 7301));
70  *      res.add(new PortForwarder(8401, 7401));
71  *      res.add(new PortForwarder(7302, 8302));
72  *      res.add(new PortForwarder(7402, 8402));
73  *      res.add(new PortForwarder(7303, 8303));
74  *      res.add(new PortForwarder(7403, 8403));
75  *      return res;
76  *  }
77  *
78  *  private void stopForwarding(List<PortForwarder> pfs) throws Exception {
79  *       for (PortForwarder pf : pfs) {
80  *           pf.shutdown();
81  *       }
82  *  }
83  *
84  *
85  */
86 public class PortForwarder extends Thread {
87 
88     private static final Logger LOG = LoggerFactory.getLogger(PortForwarder.class);
89 
90     private static class PortForwardWorker implements Runnable {
91 
92         private final InputStream in;
93         private final OutputStream out;
94         private final Socket toClose;
95         private final Socket toClose2;
96         private boolean isFinished = false;
97 
PortForwardWorker(Socket toClose, Socket toClose2, InputStream in, OutputStream out)98         PortForwardWorker(Socket toClose, Socket toClose2, InputStream in, OutputStream out) {
99             this.toClose = toClose;
100             this.toClose2 = toClose2;
101             this.in = in;
102             this.out = out;
103             // LOG.info("starting forward for "+toClose);
104         }
105 
run()106         public void run() {
107             Thread.currentThread().setName(toClose.toString() + "-->" + toClose2.toString());
108             byte[] buf = new byte[1024];
109             try {
110                 while (true) {
111                     try {
112                         int read = this.in.read(buf);
113                         if (read > 0) {
114                             try {
115                                 this.out.write(buf, 0, read);
116                             } catch (IOException e) {
117                                 LOG.warn("exception during write", e);
118                                 break;
119                             }
120                         } else if (read < 0) {
121                             throw new IOException("read " + read);
122                         }
123                     } catch (SocketTimeoutException e) {
124                         LOG.error("socket timeout", e);
125                     }
126                 }
127                 Thread.sleep(1);
128             } catch (InterruptedException e) {
129                 LOG.warn("Interrupted", e);
130             } catch (SocketException e) {
131                 if (!"Socket closed".equals(e.getMessage())) {
132                     LOG.error("Unexpected exception", e);
133                 }
134             } catch (IOException e) {
135                 LOG.error("Unexpected exception", e);
136             } finally {
137                 shutdown();
138             }
139             LOG.info("Shutting down forward for {}", toClose);
140             isFinished = true;
141         }
142 
waitForShutdown(long timeoutMs)143         boolean waitForShutdown(long timeoutMs) throws InterruptedException {
144             synchronized (this) {
145                 if (!isFinished) {
146                     this.wait(timeoutMs);
147                 }
148             }
149             return isFinished;
150         }
151 
shutdown()152         public void shutdown() {
153             try {
154                 toClose.close();
155             } catch (IOException ex) {
156                 // ignore
157             }
158             try {
159                 toClose2.close();
160             } catch (IOException ex) {
161                 // ignore silently
162             }
163         }
164 
165     }
166 
167     private volatile boolean stopped = false;
168     private ExecutorService workerExecutor = Executors.newCachedThreadPool();
169     private List<PortForwardWorker> workers = new ArrayList<>();
170     private ServerSocket serverSocket;
171     private final int to;
172 
PortForwarder(int from, int to)173     public PortForwarder(int from, int to) throws IOException {
174         this.to = to;
175         serverSocket = new ServerSocket(from);
176         serverSocket.setSoTimeout(30000);
177         this.start();
178     }
179 
180     @Override
run()181     public void run() {
182         try {
183             while (!stopped) {
184                 Socket sock = null;
185                 try {
186                     LOG.info("accepting socket local:{} to:{}", serverSocket.getLocalPort(), to);
187                     sock = serverSocket.accept();
188                     LOG.info("accepted: local:{} from:{} to:{}", sock.getLocalPort(), sock.getPort(), to);
189                     Socket target = null;
190                     int retry = 10;
191                     while (sock.isConnected()) {
192                         try {
193                             target = new Socket("localhost", to);
194                             break;
195                         } catch (IOException e) {
196                             if (retry == 0) {
197                                 throw e;
198                             }
199                             LOG.warn(
200                                 "connection failed, retrying({}): local:{} from:{} to:{}",
201                                 retry,
202                                 sock.getLocalPort(),
203                                 sock.getPort(),
204                                 to,
205                                 e);
206                         }
207                         Thread.sleep(TimeUnit.SECONDS.toMillis(1));
208                         retry--;
209                     }
210                     LOG.info("connected: local:{} from:{} to:{}", sock.getLocalPort(), sock.getPort(), to);
211                     sock.setSoTimeout(30000);
212                     target.setSoTimeout(30000);
213 
214                     workers.add(new PortForwardWorker(sock, target, sock.getInputStream(), target.getOutputStream()));
215                     workers.add(new PortForwardWorker(target, sock, target.getInputStream(), sock.getOutputStream()));
216                     for (PortForwardWorker worker : workers) {
217                         workerExecutor.submit(worker);
218                     }
219                 } catch (SocketTimeoutException e) {
220                     LOG.warn("socket timed out", e);
221                 } catch (ConnectException e) {
222                     LOG.warn(
223                         "connection exception local:{} from:{} to:{}",
224                         sock.getLocalPort(),
225                         sock.getPort(),
226                         to,
227                         e);
228                     sock.close();
229                 } catch (IOException e) {
230                     if (!"Socket closed".equals(e.getMessage())) {
231                         LOG.warn(
232                             "unexpected exception local:{} from:{} to:{}",
233                             sock.getLocalPort(),
234                             sock.getPort(),
235                             to,
236                             e);
237                         throw e;
238                     }
239                 }
240 
241             }
242         } catch (IOException e) {
243             LOG.error("Unexpected exception to:{}", to, e);
244         } catch (InterruptedException e) {
245             LOG.error("Interrupted to:{}", to, e);
246         }
247     }
248 
shutdown()249     public void shutdown() throws Exception {
250         this.stopped = true;
251         this.serverSocket.close();
252         this.join();
253         this.workerExecutor.shutdownNow();
254         for (PortForwardWorker worker : workers) {
255             worker.shutdown();
256         }
257 
258         for (PortForwardWorker worker : workers) {
259             if (!worker.waitForShutdown(5000)) {
260                 throw new Exception("Failed to stop forwarding within 5 seconds");
261             }
262         }
263     }
264 
265 }
266