1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.net;
19 
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22 import java.net.ServerSocket;
23 import java.net.Socket;
24 import java.net.SocketTimeoutException;
25 import java.nio.channels.ServerSocketChannel;
26 import java.nio.channels.SocketChannel;
27 
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.hdfs.protocol.DatanodeID;
32 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
33 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
34 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
35 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
36 import org.apache.hadoop.io.IOUtils;
37 import org.apache.hadoop.ipc.Server;
38 import org.apache.hadoop.security.token.Token;
39 
40 @InterfaceAudience.Private
41 public class TcpPeerServer implements PeerServer {
42   static final Log LOG = LogFactory.getLog(TcpPeerServer.class);
43 
44   private final ServerSocket serverSocket;
45 
peerFromSocket(Socket socket)46   public static Peer peerFromSocket(Socket socket)
47       throws IOException {
48     Peer peer = null;
49     boolean success = false;
50     try {
51       // TCP_NODELAY is crucial here because of bad interactions between
52       // Nagle's Algorithm and Delayed ACKs. With connection keepalive
53       // between the client and DN, the conversation looks like:
54       //   1. Client -> DN: Read block X
55       //   2. DN -> Client: data for block X
56       //   3. Client -> DN: Status OK (successful read)
57       //   4. Client -> DN: Read block Y
58       // The fact that step #3 and #4 are both in the client->DN direction
59       // triggers Nagling. If the DN is using delayed ACKs, this results
60       // in a delay of 40ms or more.
61       //
62       // TCP_NODELAY disables nagling and thus avoids this performance
63       // disaster.
64       socket.setTcpNoDelay(true);
65       SocketChannel channel = socket.getChannel();
66       if (channel == null) {
67         peer = new BasicInetPeer(socket);
68       } else {
69         peer = new NioInetPeer(socket);
70       }
71       success = true;
72       return peer;
73     } finally {
74       if (!success) {
75         if (peer != null) peer.close();
76         socket.close();
77       }
78     }
79   }
80 
peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)81   public static Peer peerFromSocketAndKey(
82         SaslDataTransferClient saslClient, Socket s,
83         DataEncryptionKeyFactory keyFactory,
84         Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
85         throws IOException {
86     Peer peer = null;
87     boolean success = false;
88     try {
89       peer = peerFromSocket(s);
90       peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
91       success = true;
92       return peer;
93     } finally {
94       if (!success) {
95         IOUtils.cleanup(null, peer);
96       }
97     }
98   }
99 
100   /**
101    * Create a non-secure TcpPeerServer.
102    *
103    * @param socketWriteTimeout    The Socket write timeout in ms.
104    * @param bindAddr              The address to bind to.
105    * @throws IOException
106    */
TcpPeerServer(int socketWriteTimeout, InetSocketAddress bindAddr)107   public TcpPeerServer(int socketWriteTimeout,
108         InetSocketAddress bindAddr) throws IOException {
109     this.serverSocket = (socketWriteTimeout > 0) ?
110           ServerSocketChannel.open().socket() : new ServerSocket();
111     Server.bind(serverSocket, bindAddr, 0);
112   }
113 
114   /**
115    * Create a secure TcpPeerServer.
116    *
117    * @param secureResources   Security resources.
118    */
TcpPeerServer(SecureResources secureResources)119   public TcpPeerServer(SecureResources secureResources) {
120     this.serverSocket = secureResources.getStreamingSocket();
121   }
122 
123   /**
124    * @return     the IP address which this TcpPeerServer is listening on.
125    */
getStreamingAddr()126   public InetSocketAddress getStreamingAddr() {
127     return new InetSocketAddress(
128         serverSocket.getInetAddress().getHostAddress(),
129         serverSocket.getLocalPort());
130   }
131 
132   @Override
setReceiveBufferSize(int size)133   public void setReceiveBufferSize(int size) throws IOException {
134     this.serverSocket.setReceiveBufferSize(size);
135   }
136 
137   @Override
accept()138   public Peer accept() throws IOException, SocketTimeoutException {
139     Peer peer = peerFromSocket(serverSocket.accept());
140     return peer;
141   }
142 
143   @Override
getListeningString()144   public String getListeningString() {
145     return serverSocket.getLocalSocketAddress().toString();
146   }
147 
148   @Override
close()149   public void close() throws IOException {
150     try {
151       serverSocket.close();
152     } catch(IOException e) {
153       LOG.error("error closing TcpPeerServer: ", e);
154     }
155   }
156 
157   @Override
toString()158   public String toString() {
159     return "TcpPeerServer(" + getListeningString() + ")";
160   }
161 }
162