1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.shortcircuit; 19 20 import java.io.IOException; 21 import java.net.InetSocketAddress; 22 import java.util.concurrent.TimeUnit; 23 24 import com.google.common.annotations.VisibleForTesting; 25 import org.apache.commons.io.IOUtils; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.apache.hadoop.HadoopIllegalArgumentException; 29 import org.apache.hadoop.hdfs.DFSClient; 30 import org.apache.hadoop.hdfs.DFSClient.Conf; 31 import org.apache.hadoop.hdfs.DFSConfigKeys; 32 import org.apache.hadoop.net.unix.DomainSocket; 33 34 import com.google.common.base.Preconditions; 35 import com.google.common.cache.Cache; 36 import com.google.common.cache.CacheBuilder; 37 import org.apache.hadoop.util.PerformanceAdvisory; 38 39 public class DomainSocketFactory { 40 private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); 41 42 public enum PathState { 43 UNUSABLE(false, false), 44 SHORT_CIRCUIT_DISABLED(true, false), 45 VALID(true, true); 46 PathState(boolean usableForDataTransfer, boolean usableForShortCircuit)47 PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) { 48 this.usableForDataTransfer = usableForDataTransfer; 49 this.usableForShortCircuit = usableForShortCircuit; 50 } 51 getUsableForDataTransfer()52 public boolean getUsableForDataTransfer() { 53 return usableForDataTransfer; 54 } 55 getUsableForShortCircuit()56 public boolean getUsableForShortCircuit() { 57 return usableForShortCircuit; 58 } 59 60 private final boolean usableForDataTransfer; 61 private final boolean usableForShortCircuit; 62 } 63 64 public static class PathInfo { 65 private final static PathInfo NOT_CONFIGURED = 66 new PathInfo("", PathState.UNUSABLE); 67 68 final private String path; 69 final private PathState state; 70 PathInfo(String path, PathState state)71 PathInfo(String path, PathState state) { 72 this.path = path; 73 this.state = state; 74 } 75 getPath()76 public String getPath() { 77 return path; 78 } 79 getPathState()80 public PathState getPathState() { 81 return state; 82 } 83 84 @Override toString()85 public String toString() { 86 return new StringBuilder().append("PathInfo{path=").append(path). 87 append(", state=").append(state).append("}").toString(); 88 } 89 } 90 91 /** 92 * Information about domain socket paths. 93 */ 94 final Cache<String, PathState> pathMap = 95 CacheBuilder.newBuilder() 96 .expireAfterWrite(10, TimeUnit.MINUTES) 97 .build(); 98 DomainSocketFactory(Conf conf)99 public DomainSocketFactory(Conf conf) { 100 final String feature; 101 if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) { 102 feature = "The short-circuit local reads feature"; 103 } else if (conf.isDomainSocketDataTraffic()) { 104 feature = "UNIX domain socket data traffic"; 105 } else { 106 feature = null; 107 } 108 109 if (feature == null) { 110 PerformanceAdvisory.LOG.debug( 111 "Both short-circuit local reads and UNIX domain socket are disabled."); 112 } else { 113 if (conf.getDomainSocketPath().isEmpty()) { 114 throw new HadoopIllegalArgumentException(feature + " is enabled but " 115 + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set."); 116 } else if (DomainSocket.getLoadingFailureReason() != null) { 117 LOG.warn(feature + " cannot be used because " 118 + DomainSocket.getLoadingFailureReason()); 119 } else { 120 LOG.debug(feature + " is enabled."); 121 } 122 } 123 } 124 125 /** 126 * Get information about a domain socket path. 127 * 128 * @param addr The inet address to use. 129 * @param conf The client configuration. 130 * 131 * @return Information about the socket path. 132 */ getPathInfo(InetSocketAddress addr, DFSClient.Conf conf)133 public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) { 134 // If there is no domain socket path configured, we can't use domain 135 // sockets. 136 if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED; 137 // If we can't do anything with the domain socket, don't create it. 138 if (!conf.isDomainSocketDataTraffic() && 139 (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) { 140 return PathInfo.NOT_CONFIGURED; 141 } 142 // If the DomainSocket code is not loaded, we can't create 143 // DomainSocket objects. 144 if (DomainSocket.getLoadingFailureReason() != null) { 145 return PathInfo.NOT_CONFIGURED; 146 } 147 // UNIX domain sockets can only be used to talk to local peers 148 if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; 149 String escapedPath = DomainSocket.getEffectivePath( 150 conf.getDomainSocketPath(), addr.getPort()); 151 PathState status = pathMap.getIfPresent(escapedPath); 152 if (status == null) { 153 return new PathInfo(escapedPath, PathState.VALID); 154 } else { 155 return new PathInfo(escapedPath, status); 156 } 157 } 158 createSocket(PathInfo info, int socketTimeout)159 public DomainSocket createSocket(PathInfo info, int socketTimeout) { 160 Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE); 161 boolean success = false; 162 DomainSocket sock = null; 163 try { 164 sock = DomainSocket.connect(info.getPath()); 165 sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout); 166 success = true; 167 } catch (IOException e) { 168 LOG.warn("error creating DomainSocket", e); 169 // fall through 170 } finally { 171 if (!success) { 172 if (sock != null) { 173 IOUtils.closeQuietly(sock); 174 } 175 pathMap.put(info.getPath(), PathState.UNUSABLE); 176 sock = null; 177 } 178 } 179 return sock; 180 } 181 disableShortCircuitForPath(String path)182 public void disableShortCircuitForPath(String path) { 183 pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED); 184 } 185 disableDomainSocketPath(String path)186 public void disableDomainSocketPath(String path) { 187 pathMap.put(path, PathState.UNUSABLE); 188 } 189 190 @VisibleForTesting clearPathMap()191 public void clearPathMap() { 192 pathMap.invalidateAll(); 193 } 194 } 195