1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.shortcircuit;
19 
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22 import java.util.concurrent.TimeUnit;
23 
24 import com.google.common.annotations.VisibleForTesting;
25 import org.apache.commons.io.IOUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.HadoopIllegalArgumentException;
29 import org.apache.hadoop.hdfs.DFSClient;
30 import org.apache.hadoop.hdfs.DFSClient.Conf;
31 import org.apache.hadoop.hdfs.DFSConfigKeys;
32 import org.apache.hadoop.net.unix.DomainSocket;
33 
34 import com.google.common.base.Preconditions;
35 import com.google.common.cache.Cache;
36 import com.google.common.cache.CacheBuilder;
37 import org.apache.hadoop.util.PerformanceAdvisory;
38 
39 public class DomainSocketFactory {
40   private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
41 
42   public enum PathState {
43     UNUSABLE(false, false),
44     SHORT_CIRCUIT_DISABLED(true, false),
45     VALID(true, true);
46 
PathState(boolean usableForDataTransfer, boolean usableForShortCircuit)47     PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) {
48       this.usableForDataTransfer = usableForDataTransfer;
49       this.usableForShortCircuit = usableForShortCircuit;
50     }
51 
getUsableForDataTransfer()52     public boolean getUsableForDataTransfer() {
53       return usableForDataTransfer;
54     }
55 
getUsableForShortCircuit()56     public boolean getUsableForShortCircuit() {
57       return usableForShortCircuit;
58     }
59 
60     private final boolean usableForDataTransfer;
61     private final boolean usableForShortCircuit;
62   }
63 
64   public static class PathInfo {
65     private final static PathInfo NOT_CONFIGURED =
66           new PathInfo("", PathState.UNUSABLE);
67 
68     final private String path;
69     final private PathState state;
70 
PathInfo(String path, PathState state)71     PathInfo(String path, PathState state) {
72       this.path = path;
73       this.state = state;
74     }
75 
getPath()76     public String getPath() {
77       return path;
78     }
79 
getPathState()80     public PathState getPathState() {
81       return state;
82     }
83 
84     @Override
toString()85     public String toString() {
86       return new StringBuilder().append("PathInfo{path=").append(path).
87           append(", state=").append(state).append("}").toString();
88     }
89   }
90 
91   /**
92    * Information about domain socket paths.
93    */
94   final Cache<String, PathState> pathMap =
95       CacheBuilder.newBuilder()
96       .expireAfterWrite(10, TimeUnit.MINUTES)
97       .build();
98 
DomainSocketFactory(Conf conf)99   public DomainSocketFactory(Conf conf) {
100     final String feature;
101     if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) {
102       feature = "The short-circuit local reads feature";
103     } else if (conf.isDomainSocketDataTraffic()) {
104       feature = "UNIX domain socket data traffic";
105     } else {
106       feature = null;
107     }
108 
109     if (feature == null) {
110       PerformanceAdvisory.LOG.debug(
111           "Both short-circuit local reads and UNIX domain socket are disabled.");
112     } else {
113       if (conf.getDomainSocketPath().isEmpty()) {
114         throw new HadoopIllegalArgumentException(feature + " is enabled but "
115             + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
116       } else if (DomainSocket.getLoadingFailureReason() != null) {
117         LOG.warn(feature + " cannot be used because "
118             + DomainSocket.getLoadingFailureReason());
119       } else {
120         LOG.debug(feature + " is enabled.");
121       }
122     }
123   }
124 
125   /**
126    * Get information about a domain socket path.
127    *
128    * @param addr         The inet address to use.
129    * @param conf         The client configuration.
130    *
131    * @return             Information about the socket path.
132    */
getPathInfo(InetSocketAddress addr, DFSClient.Conf conf)133   public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) {
134     // If there is no domain socket path configured, we can't use domain
135     // sockets.
136     if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;
137     // If we can't do anything with the domain socket, don't create it.
138     if (!conf.isDomainSocketDataTraffic() &&
139         (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) {
140       return PathInfo.NOT_CONFIGURED;
141     }
142     // If the DomainSocket code is not loaded, we can't create
143     // DomainSocket objects.
144     if (DomainSocket.getLoadingFailureReason() != null) {
145       return PathInfo.NOT_CONFIGURED;
146     }
147     // UNIX domain sockets can only be used to talk to local peers
148     if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
149     String escapedPath = DomainSocket.getEffectivePath(
150         conf.getDomainSocketPath(), addr.getPort());
151     PathState status = pathMap.getIfPresent(escapedPath);
152     if (status == null) {
153       return new PathInfo(escapedPath, PathState.VALID);
154     } else {
155       return new PathInfo(escapedPath, status);
156     }
157   }
158 
createSocket(PathInfo info, int socketTimeout)159   public DomainSocket createSocket(PathInfo info, int socketTimeout) {
160     Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE);
161     boolean success = false;
162     DomainSocket sock = null;
163     try {
164       sock = DomainSocket.connect(info.getPath());
165       sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout);
166       success = true;
167     } catch (IOException e) {
168       LOG.warn("error creating DomainSocket", e);
169       // fall through
170     } finally {
171       if (!success) {
172         if (sock != null) {
173           IOUtils.closeQuietly(sock);
174         }
175         pathMap.put(info.getPath(), PathState.UNUSABLE);
176         sock = null;
177       }
178     }
179     return sock;
180   }
181 
disableShortCircuitForPath(String path)182   public void disableShortCircuitForPath(String path) {
183     pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED);
184   }
185 
disableDomainSocketPath(String path)186   public void disableDomainSocketPath(String path) {
187     pathMap.put(path, PathState.UNUSABLE);
188   }
189 
190   @VisibleForTesting
clearPathMap()191   public void clearPathMap() {
192     pathMap.invalidateAll();
193   }
194 }
195