1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.net; 19 20 import java.io.IOException; 21 import java.net.InetSocketAddress; 22 import java.net.ServerSocket; 23 import java.net.Socket; 24 import java.net.SocketTimeoutException; 25 import java.nio.channels.ServerSocketChannel; 26 import java.nio.channels.SocketChannel; 27 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import org.apache.hadoop.classification.InterfaceAudience; 31 import org.apache.hadoop.hdfs.protocol.DatanodeID; 32 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; 33 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 34 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 35 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; 36 import org.apache.hadoop.io.IOUtils; 37 import org.apache.hadoop.ipc.Server; 38 import org.apache.hadoop.security.token.Token; 39 40 @InterfaceAudience.Private 41 public class TcpPeerServer implements PeerServer { 42 static final Log LOG = LogFactory.getLog(TcpPeerServer.class); 43 44 private final ServerSocket serverSocket; 45 peerFromSocket(Socket socket)46 public static Peer peerFromSocket(Socket socket) 47 throws IOException { 48 Peer peer = null; 49 boolean success = false; 50 try { 51 // TCP_NODELAY is crucial here because of bad interactions between 52 // Nagle's Algorithm and Delayed ACKs. With connection keepalive 53 // between the client and DN, the conversation looks like: 54 // 1. Client -> DN: Read block X 55 // 2. DN -> Client: data for block X 56 // 3. Client -> DN: Status OK (successful read) 57 // 4. Client -> DN: Read block Y 58 // The fact that step #3 and #4 are both in the client->DN direction 59 // triggers Nagling. If the DN is using delayed ACKs, this results 60 // in a delay of 40ms or more. 61 // 62 // TCP_NODELAY disables nagling and thus avoids this performance 63 // disaster. 64 socket.setTcpNoDelay(true); 65 SocketChannel channel = socket.getChannel(); 66 if (channel == null) { 67 peer = new BasicInetPeer(socket); 68 } else { 69 peer = new NioInetPeer(socket); 70 } 71 success = true; 72 return peer; 73 } finally { 74 if (!success) { 75 if (peer != null) peer.close(); 76 socket.close(); 77 } 78 } 79 } 80 peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)81 public static Peer peerFromSocketAndKey( 82 SaslDataTransferClient saslClient, Socket s, 83 DataEncryptionKeyFactory keyFactory, 84 Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) 85 throws IOException { 86 Peer peer = null; 87 boolean success = false; 88 try { 89 peer = peerFromSocket(s); 90 peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); 91 success = true; 92 return peer; 93 } finally { 94 if (!success) { 95 IOUtils.cleanup(null, peer); 96 } 97 } 98 } 99 100 /** 101 * Create a non-secure TcpPeerServer. 102 * 103 * @param socketWriteTimeout The Socket write timeout in ms. 104 * @param bindAddr The address to bind to. 105 * @throws IOException 106 */ TcpPeerServer(int socketWriteTimeout, InetSocketAddress bindAddr)107 public TcpPeerServer(int socketWriteTimeout, 108 InetSocketAddress bindAddr) throws IOException { 109 this.serverSocket = (socketWriteTimeout > 0) ? 110 ServerSocketChannel.open().socket() : new ServerSocket(); 111 Server.bind(serverSocket, bindAddr, 0); 112 } 113 114 /** 115 * Create a secure TcpPeerServer. 116 * 117 * @param secureResources Security resources. 118 */ TcpPeerServer(SecureResources secureResources)119 public TcpPeerServer(SecureResources secureResources) { 120 this.serverSocket = secureResources.getStreamingSocket(); 121 } 122 123 /** 124 * @return the IP address which this TcpPeerServer is listening on. 125 */ getStreamingAddr()126 public InetSocketAddress getStreamingAddr() { 127 return new InetSocketAddress( 128 serverSocket.getInetAddress().getHostAddress(), 129 serverSocket.getLocalPort()); 130 } 131 132 @Override setReceiveBufferSize(int size)133 public void setReceiveBufferSize(int size) throws IOException { 134 this.serverSocket.setReceiveBufferSize(size); 135 } 136 137 @Override accept()138 public Peer accept() throws IOException, SocketTimeoutException { 139 Peer peer = peerFromSocket(serverSocket.accept()); 140 return peer; 141 } 142 143 @Override getListeningString()144 public String getListeningString() { 145 return serverSocket.getLocalSocketAddress().toString(); 146 } 147 148 @Override close()149 public void close() throws IOException { 150 try { 151 serverSocket.close(); 152 } catch(IOException e) { 153 LOG.error("error closing TcpPeerServer: ", e); 154 } 155 } 156 157 @Override toString()158 public String toString() { 159 return "TcpPeerServer(" + getListeningString() + ")"; 160 } 161 } 162