1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
26 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
27 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
28 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
29 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
30 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
31 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
32 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
33 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
34 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
35 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
36 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
37 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
38 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
39 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
40 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
41 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
42 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
43 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
44 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
45 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
46 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
47 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT;
48 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
49 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
50 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
51 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
52 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
53 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
54 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
55 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
56 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
57 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
58 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
59 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
60 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
61 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
62 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
63 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
64 
65 import java.io.BufferedOutputStream;
66 import java.io.DataInputStream;
67 import java.io.DataOutputStream;
68 import java.io.FileNotFoundException;
69 import java.io.IOException;
70 import java.io.InputStream;
71 import java.io.OutputStream;
72 import java.lang.reflect.Proxy;
73 import java.net.InetAddress;
74 import java.net.InetSocketAddress;
75 import java.net.Socket;
76 import java.net.SocketAddress;
77 import java.net.URI;
78 import java.net.UnknownHostException;
79 import java.nio.charset.Charset;
80 import java.security.GeneralSecurityException;
81 import java.util.ArrayList;
82 import java.util.Collections;
83 import java.util.EnumSet;
84 import java.util.HashMap;
85 import java.util.LinkedHashMap;
86 import java.util.List;
87 import java.util.Map;
88 import java.util.Random;
89 import java.util.concurrent.SynchronousQueue;
90 import java.util.concurrent.ThreadPoolExecutor;
91 import java.util.concurrent.TimeUnit;
92 import java.util.concurrent.atomic.AtomicBoolean;
93 import java.util.concurrent.atomic.AtomicInteger;
94 
95 import javax.net.SocketFactory;
96 
97 import org.apache.commons.logging.Log;
98 import org.apache.commons.logging.LogFactory;
99 import org.apache.hadoop.HadoopIllegalArgumentException;
100 import org.apache.hadoop.classification.InterfaceAudience;
101 import org.apache.hadoop.conf.Configuration;
102 import org.apache.hadoop.crypto.CipherSuite;
103 import org.apache.hadoop.crypto.CryptoCodec;
104 import org.apache.hadoop.crypto.CryptoInputStream;
105 import org.apache.hadoop.crypto.CryptoOutputStream;
106 import org.apache.hadoop.crypto.CryptoProtocolVersion;
107 import org.apache.hadoop.crypto.key.KeyProvider;
108 import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
109 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
110 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
111 import org.apache.hadoop.fs.BlockLocation;
112 import org.apache.hadoop.fs.BlockStorageLocation;
113 import org.apache.hadoop.fs.CacheFlag;
114 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
115 import org.apache.hadoop.fs.ContentSummary;
116 import org.apache.hadoop.fs.CreateFlag;
117 import org.apache.hadoop.fs.FileAlreadyExistsException;
118 import org.apache.hadoop.fs.FileEncryptionInfo;
119 import org.apache.hadoop.fs.FileSystem;
120 import org.apache.hadoop.fs.FsServerDefaults;
121 import org.apache.hadoop.fs.FsStatus;
122 import org.apache.hadoop.fs.HdfsBlockLocation;
123 import org.apache.hadoop.fs.InvalidPathException;
124 import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
125 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
126 import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
127 import org.apache.hadoop.fs.Options;
128 import org.apache.hadoop.fs.Options.ChecksumOpt;
129 import org.apache.hadoop.fs.ParentNotDirectoryException;
130 import org.apache.hadoop.fs.Path;
131 import org.apache.hadoop.fs.StorageType;
132 import org.apache.hadoop.fs.RemoteIterator;
133 import org.apache.hadoop.fs.UnresolvedLinkException;
134 import org.apache.hadoop.fs.VolumeId;
135 import org.apache.hadoop.fs.XAttr;
136 import org.apache.hadoop.fs.XAttrSetFlag;
137 import org.apache.hadoop.fs.permission.AclEntry;
138 import org.apache.hadoop.fs.permission.AclStatus;
139 import org.apache.hadoop.fs.permission.FsAction;
140 import org.apache.hadoop.fs.permission.FsPermission;
141 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
142 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
143 import org.apache.hadoop.hdfs.net.Peer;
144 import org.apache.hadoop.hdfs.net.TcpPeerServer;
145 import org.apache.hadoop.hdfs.protocol.AclException;
146 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
147 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
148 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
149 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
150 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
151 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
152 import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
153 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
154 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
155 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
156 import org.apache.hadoop.hdfs.protocol.DatanodeID;
157 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
158 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
159 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
160 import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
161 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
162 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
163 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
164 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
165 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
166 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
167 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
168 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
169 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
170 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
171 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
172 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
173 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
174 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
175 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
176 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
177 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
178 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
179 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
180 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
181 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
182 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
183 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
184 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
185 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
186 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
187 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
188 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
189 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
190 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
191 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
192 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
193 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
194 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
195 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
196 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
197 import org.apache.hadoop.hdfs.server.namenode.NameNode;
198 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
199 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
200 import org.apache.hadoop.hdfs.util.ByteArrayManager;
201 import org.apache.hadoop.io.DataOutputBuffer;
202 import org.apache.hadoop.io.EnumSetWritable;
203 import org.apache.hadoop.io.IOUtils;
204 import org.apache.hadoop.io.MD5Hash;
205 import org.apache.hadoop.io.Text;
206 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
207 import org.apache.hadoop.ipc.Client;
208 import org.apache.hadoop.ipc.RPC;
209 import org.apache.hadoop.ipc.RemoteException;
210 import org.apache.hadoop.ipc.RpcInvocationHandler;
211 import org.apache.hadoop.net.DNS;
212 import org.apache.hadoop.net.NetUtils;
213 import org.apache.hadoop.security.AccessControlException;
214 import org.apache.hadoop.security.UserGroupInformation;
215 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
216 import org.apache.hadoop.security.token.Token;
217 import org.apache.hadoop.security.token.TokenRenewer;
218 import org.apache.hadoop.tracing.SpanReceiverHost;
219 import org.apache.hadoop.tracing.TraceUtils;
220 import org.apache.hadoop.util.Daemon;
221 import org.apache.hadoop.util.DataChecksum;
222 import org.apache.hadoop.util.DataChecksum.Type;
223 import org.apache.hadoop.util.Progressable;
224 import org.apache.hadoop.util.Time;
225 import org.apache.htrace.Sampler;
226 import org.apache.htrace.SamplerBuilder;
227 import org.apache.htrace.Span;
228 import org.apache.htrace.Trace;
229 import org.apache.htrace.TraceScope;
230 
231 import com.google.common.annotations.VisibleForTesting;
232 import com.google.common.base.Joiner;
233 import com.google.common.base.Preconditions;
234 import com.google.common.collect.Lists;
235 import com.google.common.net.InetAddresses;
236 
237 /********************************************************
238  * DFSClient can connect to a Hadoop Filesystem and
239  * perform basic file tasks.  It uses the ClientProtocol
240  * to communicate with a NameNode daemon, and connects
241  * directly to DataNodes to read/write block data.
242  *
243  * Hadoop DFS users should obtain an instance of
244  * DistributedFileSystem, which uses DFSClient to handle
245  * filesystem tasks.
246  *
247  ********************************************************/
248 @InterfaceAudience.Private
249 public class DFSClient implements java.io.Closeable, RemotePeerFactory,
250     DataEncryptionKeyFactory {
251   public static final Log LOG = LogFactory.getLog(DFSClient.class);
252   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
253   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
254 
255   private final Configuration conf;
256   private final Conf dfsClientConf;
257   final ClientProtocol namenode;
258   /* The service used for delegation tokens */
259   private Text dtService;
260 
261   final UserGroupInformation ugi;
262   volatile boolean clientRunning = true;
263   volatile long lastLeaseRenewal;
264   private volatile FsServerDefaults serverDefaults;
265   private volatile long serverDefaultsLastUpdate;
266   final String clientName;
267   final SocketFactory socketFactory;
268   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
269   final FileSystem.Statistics stats;
270   private final String authority;
271   private final Random r = new Random();
272   private SocketAddress[] localInterfaceAddrs;
273   private DataEncryptionKey encryptionKey;
274   final SaslDataTransferClient saslClient;
275   private final CachingStrategy defaultReadCachingStrategy;
276   private final CachingStrategy defaultWriteCachingStrategy;
277   private final ClientContext clientContext;
278   private volatile long hedgedReadThresholdMillis;
279   private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
280       new DFSHedgedReadMetrics();
281   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
282   private final Sampler<?> traceSampler;
283 
284   /**
285    * DFSClient configuration
286    */
287   public static class Conf {
288     final int hdfsTimeout;    // timeout value for a DFS operation.
289 
290     final int maxFailoverAttempts;
291     final int maxRetryAttempts;
292     final int failoverSleepBaseMillis;
293     final int failoverSleepMaxMillis;
294     final int maxBlockAcquireFailures;
295     final int confTime;
296     final int ioBufferSize;
297     final ChecksumOpt defaultChecksumOpt;
298     final int writePacketSize;
299     final int writeMaxPackets;
300     final ByteArrayManager.Conf writeByteArrayManagerConf;
301     final int socketTimeout;
302     final int socketCacheCapacity;
303     final long socketCacheExpiry;
304     final long excludedNodesCacheExpiry;
305     /** Wait time window (in msec) if BlockMissingException is caught */
306     final int timeWindow;
307     final int nCachedConnRetry;
308     final int nBlockWriteRetry;
309     final int nBlockWriteLocateFollowingRetry;
310     final long defaultBlockSize;
311     final long prefetchSize;
312     final short defaultReplication;
313     final String taskId;
314     final FsPermission uMask;
315     final boolean connectToDnViaHostname;
316     final boolean getHdfsBlocksMetadataEnabled;
317     final int getFileBlockStorageLocationsNumThreads;
318     final int getFileBlockStorageLocationsTimeoutMs;
319     final int retryTimesForGetLastBlockLength;
320     final int retryIntervalForGetLastBlockLength;
321     final long datanodeRestartTimeout;
322     final long dfsclientSlowIoWarningThresholdMs;
323 
324     final boolean useLegacyBlockReader;
325     final boolean useLegacyBlockReaderLocal;
326     final String domainSocketPath;
327     final boolean skipShortCircuitChecksums;
328     final int shortCircuitBufferSize;
329     final boolean shortCircuitLocalReads;
330     final boolean domainSocketDataTraffic;
331     final int shortCircuitStreamsCacheSize;
332     final long shortCircuitStreamsCacheExpiryMs;
333     final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
334 
335     final boolean shortCircuitMmapEnabled;
336     final int shortCircuitMmapCacheSize;
337     final long shortCircuitMmapCacheExpiryMs;
338     final long shortCircuitMmapCacheRetryTimeout;
339     final long shortCircuitCacheStaleThresholdMs;
340 
341     final long keyProviderCacheExpiryMs;
342     public BlockReaderFactory.FailureInjector brfFailureInjector =
343       new BlockReaderFactory.FailureInjector();
344 
Conf(Configuration conf)345     public Conf(Configuration conf) {
346       // The hdfsTimeout is currently the same as the ipc timeout
347       hdfsTimeout = Client.getTimeout(conf);
348       maxFailoverAttempts = conf.getInt(
349           DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
350           DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
351       maxRetryAttempts = conf.getInt(
352           DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
353           DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
354       failoverSleepBaseMillis = conf.getInt(
355           DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
356           DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
357       failoverSleepMaxMillis = conf.getInt(
358           DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
359           DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
360 
361       maxBlockAcquireFailures = conf.getInt(
362           DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
363           DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
364       confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
365           HdfsServerConstants.WRITE_TIMEOUT);
366       ioBufferSize = conf.getInt(
367           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
368           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
369       defaultChecksumOpt = getChecksumOptFromConf(conf);
370       socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
371           HdfsServerConstants.READ_TIMEOUT);
372       /** dfs.write.packet.size is an internal config variable */
373       writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
374           DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
375       writeMaxPackets = conf.getInt(
376           DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
377           DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
378 
379       final boolean byteArrayManagerEnabled = conf.getBoolean(
380           DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
381           DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
382       if (!byteArrayManagerEnabled) {
383         writeByteArrayManagerConf = null;
384       } else {
385         final int countThreshold = conf.getInt(
386             DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
387             DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
388         final int countLimit = conf.getInt(
389             DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
390             DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
391         final long countResetTimePeriodMs = conf.getLong(
392             DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
393             DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
394         writeByteArrayManagerConf = new ByteArrayManager.Conf(
395             countThreshold, countLimit, countResetTimePeriodMs);
396       }
397 
398 
399       defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
400           DFS_BLOCK_SIZE_DEFAULT);
401       defaultReplication = (short) conf.getInt(
402           DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
403       taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
404       socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
405           DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
406       socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
407           DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
408       excludedNodesCacheExpiry = conf.getLong(
409           DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
410           DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
411       prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
412           10 * defaultBlockSize);
413       timeWindow = conf.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
414       nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
415           DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
416       nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
417           DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
418       nBlockWriteLocateFollowingRetry = conf.getInt(
419           DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
420           DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
421       uMask = FsPermission.getUMask(conf);
422       connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
423           DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
424       getHdfsBlocksMetadataEnabled = conf.getBoolean(
425           DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
426           DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
427       getFileBlockStorageLocationsNumThreads = conf.getInt(
428           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
429           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
430       getFileBlockStorageLocationsTimeoutMs = conf.getInt(
431           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
432           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
433       retryTimesForGetLastBlockLength = conf.getInt(
434           DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH,
435           DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
436       retryIntervalForGetLastBlockLength = conf.getInt(
437         DFSConfigKeys.DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH,
438         DFSConfigKeys.DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
439 
440       useLegacyBlockReader = conf.getBoolean(
441           DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
442           DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
443       useLegacyBlockReaderLocal = conf.getBoolean(
444           DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
445           DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
446       shortCircuitLocalReads = conf.getBoolean(
447           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
448           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
449       domainSocketDataTraffic = conf.getBoolean(
450           DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
451           DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
452       domainSocketPath = conf.getTrimmed(
453           DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
454           DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
455 
456       if (BlockReaderLocal.LOG.isDebugEnabled()) {
457         BlockReaderLocal.LOG.debug(
458             DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
459             + " = " + useLegacyBlockReaderLocal);
460         BlockReaderLocal.LOG.debug(
461             DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
462             + " = " + shortCircuitLocalReads);
463         BlockReaderLocal.LOG.debug(
464             DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
465             + " = " + domainSocketDataTraffic);
466         BlockReaderLocal.LOG.debug(
467             DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
468             + " = " + domainSocketPath);
469       }
470 
471       skipShortCircuitChecksums = conf.getBoolean(
472           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
473           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
474       shortCircuitBufferSize = conf.getInt(
475           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
476           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
477       shortCircuitStreamsCacheSize = conf.getInt(
478           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
479           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
480       shortCircuitStreamsCacheExpiryMs = conf.getLong(
481           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
482           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
483       shortCircuitMmapEnabled = conf.getBoolean(
484           DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
485           DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
486       shortCircuitMmapCacheSize = conf.getInt(
487           DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
488           DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
489       shortCircuitMmapCacheExpiryMs = conf.getLong(
490           DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
491           DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
492       shortCircuitMmapCacheRetryTimeout = conf.getLong(
493           DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
494           DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
495       shortCircuitCacheStaleThresholdMs = conf.getLong(
496           DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
497           DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
498       shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
499           DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
500           DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
501 
502       datanodeRestartTimeout = conf.getLong(
503           DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
504           DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
505       dfsclientSlowIoWarningThresholdMs = conf.getLong(
506           DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
507           DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
508 
509       keyProviderCacheExpiryMs = conf.getLong(
510           DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
511           DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
512     }
513 
isUseLegacyBlockReaderLocal()514     public boolean isUseLegacyBlockReaderLocal() {
515       return useLegacyBlockReaderLocal;
516     }
517 
getDomainSocketPath()518     public String getDomainSocketPath() {
519       return domainSocketPath;
520     }
521 
isShortCircuitLocalReads()522     public boolean isShortCircuitLocalReads() {
523       return shortCircuitLocalReads;
524     }
525 
isDomainSocketDataTraffic()526     public boolean isDomainSocketDataTraffic() {
527       return domainSocketDataTraffic;
528     }
529 
getChecksumType(Configuration conf)530     private DataChecksum.Type getChecksumType(Configuration conf) {
531       final String checksum = conf.get(
532           DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
533           DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
534       try {
535         return DataChecksum.Type.valueOf(checksum);
536       } catch(IllegalArgumentException iae) {
537         LOG.warn("Bad checksum type: " + checksum + ". Using default "
538             + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
539         return DataChecksum.Type.valueOf(
540             DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
541       }
542     }
543 
544     // Construct a checksum option from conf
getChecksumOptFromConf(Configuration conf)545     private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
546       DataChecksum.Type type = getChecksumType(conf);
547       int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
548           DFS_BYTES_PER_CHECKSUM_DEFAULT);
549       return new ChecksumOpt(type, bytesPerChecksum);
550     }
551 
552     // create a DataChecksum with the default option.
createChecksum()553     private DataChecksum createChecksum() throws IOException {
554       return createChecksum(null);
555     }
556 
createChecksum(ChecksumOpt userOpt)557     private DataChecksum createChecksum(ChecksumOpt userOpt) {
558       // Fill in any missing field with the default.
559       ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
560           defaultChecksumOpt, userOpt);
561       DataChecksum dataChecksum = DataChecksum.newDataChecksum(
562           myOpt.getChecksumType(),
563           myOpt.getBytesPerChecksum());
564       if (dataChecksum == null) {
565         throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
566             + userOpt + ", default=" + defaultChecksumOpt
567             + ", effective=null");
568       }
569       return dataChecksum;
570     }
571   }
572 
getConf()573   public Conf getConf() {
574     return dfsClientConf;
575   }
576 
getConfiguration()577   Configuration getConfiguration() {
578     return conf;
579   }
580 
581   /**
582    * A map from file names to {@link DFSOutputStream} objects
583    * that are currently being written by this client.
584    * Note that a file can only be written by a single client.
585    */
586   private final Map<Long, DFSOutputStream> filesBeingWritten
587       = new HashMap<Long, DFSOutputStream>();
588 
589   /**
590    * Same as this(NameNode.getAddress(conf), conf);
591    * @see #DFSClient(InetSocketAddress, Configuration)
592    * @deprecated Deprecated at 0.21
593    */
594   @Deprecated
DFSClient(Configuration conf)595   public DFSClient(Configuration conf) throws IOException {
596     this(NameNode.getAddress(conf), conf);
597   }
598 
DFSClient(InetSocketAddress address, Configuration conf)599   public DFSClient(InetSocketAddress address, Configuration conf) throws IOException {
600     this(NameNode.getUri(address), conf);
601   }
602 
603   /**
604    * Same as this(nameNodeUri, conf, null);
605    * @see #DFSClient(URI, Configuration, FileSystem.Statistics)
606    */
DFSClient(URI nameNodeUri, Configuration conf )607   public DFSClient(URI nameNodeUri, Configuration conf
608       ) throws IOException {
609     this(nameNodeUri, conf, null);
610   }
611 
612   /**
613    * Same as this(nameNodeUri, null, conf, stats);
614    * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics)
615    */
DFSClient(URI nameNodeUri, Configuration conf, FileSystem.Statistics stats)616   public DFSClient(URI nameNodeUri, Configuration conf,
617                    FileSystem.Statistics stats)
618     throws IOException {
619     this(nameNodeUri, null, conf, stats);
620   }
621 
622   /**
623    * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
624    * If HA is enabled and a positive value is set for
625    * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
626    * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
627    * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode
628    * must be null.
629    */
630   @VisibleForTesting
DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)631   public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
632       Configuration conf, FileSystem.Statistics stats)
633     throws IOException {
634     SpanReceiverHost.get(conf, DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX);
635     traceSampler = new SamplerBuilder(TraceUtils.
636         wrapHadoopConf(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX, conf)).build();
637     // Copy only the required DFSClient configuration
638     this.dfsClientConf = new Conf(conf);
639     if (this.dfsClientConf.useLegacyBlockReaderLocal) {
640       LOG.debug("Using legacy short-circuit local reads.");
641     }
642     this.conf = conf;
643     this.stats = stats;
644     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
645     this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
646 
647     this.ugi = UserGroupInformation.getCurrentUser();
648 
649     this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
650     this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
651         DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
652     int numResponseToDrop = conf.getInt(
653         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
654         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
655     NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
656     AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
657     if (numResponseToDrop > 0) {
658       // This case is used for testing.
659       LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
660           + " is set to " + numResponseToDrop
661           + ", this hacked client will proactively drop responses");
662       proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
663           nameNodeUri, ClientProtocol.class, numResponseToDrop,
664           nnFallbackToSimpleAuth);
665     }
666 
667     if (proxyInfo != null) {
668       this.dtService = proxyInfo.getDelegationTokenService();
669       this.namenode = proxyInfo.getProxy();
670     } else if (rpcNamenode != null) {
671       // This case is used for testing.
672       Preconditions.checkArgument(nameNodeUri == null);
673       this.namenode = rpcNamenode;
674       dtService = null;
675     } else {
676       Preconditions.checkArgument(nameNodeUri != null,
677           "null URI");
678       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
679           ClientProtocol.class, nnFallbackToSimpleAuth);
680       this.dtService = proxyInfo.getDelegationTokenService();
681       this.namenode = proxyInfo.getProxy();
682     }
683 
684     String localInterfaces[] =
685       conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
686     localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
687     if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
688       LOG.debug("Using local interfaces [" +
689       Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
690       Joiner.on(',').join(localInterfaceAddrs) + "]");
691     }
692 
693     Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
694         null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
695     Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
696         null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
697     Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
698         null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
699     this.defaultReadCachingStrategy =
700         new CachingStrategy(readDropBehind, readahead);
701     this.defaultWriteCachingStrategy =
702         new CachingStrategy(writeDropBehind, readahead);
703     this.clientContext = ClientContext.get(
704         conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
705         dfsClientConf);
706     this.hedgedReadThresholdMillis = conf.getLong(
707         DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
708         DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
709     int numThreads = conf.getInt(
710         DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
711         DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
712     if (numThreads > 0) {
713       this.initThreadsNumForHedgedReads(numThreads);
714     }
715     this.saslClient = new SaslDataTransferClient(
716       conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
717       TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
718   }
719 
720   /**
721    * Return the socket addresses to use with each configured
722    * local interface. Local interfaces may be specified by IP
723    * address, IP address range using CIDR notation, interface
724    * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
725    * The socket addresses consist of the IPs for the interfaces
726    * and the ephemeral port (port 0). If an IP, IP range, or
727    * interface name matches an interface with sub-interfaces
728    * only the IP of the interface is used. Sub-interfaces can
729    * be used by specifying them explicitly (by IP or name).
730    *
731    * @return SocketAddresses for the configured local interfaces,
732    *    or an empty array if none are configured
733    * @throws UnknownHostException if a given interface name is invalid
734    */
getLocalInterfaceAddrs( String interfaceNames[])735   private static SocketAddress[] getLocalInterfaceAddrs(
736       String interfaceNames[]) throws UnknownHostException {
737     List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
738     for (String interfaceName : interfaceNames) {
739       if (InetAddresses.isInetAddress(interfaceName)) {
740         localAddrs.add(new InetSocketAddress(interfaceName, 0));
741       } else if (NetUtils.isValidSubnet(interfaceName)) {
742         for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
743           localAddrs.add(new InetSocketAddress(addr, 0));
744         }
745       } else {
746         for (String ip : DNS.getIPs(interfaceName, false)) {
747           localAddrs.add(new InetSocketAddress(ip, 0));
748         }
749       }
750     }
751     return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
752   }
753 
754   /**
755    * Select one of the configured local interfaces at random. We use a random
756    * interface because other policies like round-robin are less effective
757    * given that we cache connections to datanodes.
758    *
759    * @return one of the local interface addresses at random, or null if no
760    *    local interfaces are configured
761    */
getRandomLocalInterfaceAddr()762   SocketAddress getRandomLocalInterfaceAddr() {
763     if (localInterfaceAddrs.length == 0) {
764       return null;
765     }
766     final int idx = r.nextInt(localInterfaceAddrs.length);
767     final SocketAddress addr = localInterfaceAddrs[idx];
768     if (LOG.isDebugEnabled()) {
769       LOG.debug("Using local interface " + addr);
770     }
771     return addr;
772   }
773 
774   /**
775    * Return the number of times the client should go back to the namenode
776    * to retrieve block locations when reading.
777    */
getMaxBlockAcquireFailures()778   int getMaxBlockAcquireFailures() {
779     return dfsClientConf.maxBlockAcquireFailures;
780   }
781 
782   /**
783    * Return the timeout that clients should use when writing to datanodes.
784    * @param numNodes the number of nodes in the pipeline.
785    */
getDatanodeWriteTimeout(int numNodes)786   int getDatanodeWriteTimeout(int numNodes) {
787     return (dfsClientConf.confTime > 0) ?
788       (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
789   }
790 
getDatanodeReadTimeout(int numNodes)791   int getDatanodeReadTimeout(int numNodes) {
792     return dfsClientConf.socketTimeout > 0 ?
793         (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes +
794             dfsClientConf.socketTimeout) : 0;
795   }
796 
getHdfsTimeout()797   int getHdfsTimeout() {
798     return dfsClientConf.hdfsTimeout;
799   }
800 
801   @VisibleForTesting
getClientName()802   public String getClientName() {
803     return clientName;
804   }
805 
checkOpen()806   void checkOpen() throws IOException {
807     if (!clientRunning) {
808       IOException result = new IOException("Filesystem closed");
809       throw result;
810     }
811   }
812 
813   /** Return the lease renewer instance. The renewer thread won't start
814    *  until the first output stream is created. The same instance will
815    *  be returned until all output streams are closed.
816    */
getLeaseRenewer()817   public LeaseRenewer getLeaseRenewer() throws IOException {
818       return LeaseRenewer.getInstance(authority, ugi, this);
819   }
820 
821   /** Get a lease and start automatic renewal */
beginFileLease(final long inodeId, final DFSOutputStream out)822   private void beginFileLease(final long inodeId, final DFSOutputStream out)
823       throws IOException {
824     getLeaseRenewer().put(inodeId, out, this);
825   }
826 
827   /** Stop renewal of lease for the file. */
endFileLease(final long inodeId)828   void endFileLease(final long inodeId) throws IOException {
829     getLeaseRenewer().closeFile(inodeId, this);
830   }
831 
832 
833   /** Put a file. Only called from LeaseRenewer, where proper locking is
834    *  enforced to consistently update its local dfsclients array and
835    *  client's filesBeingWritten map.
836    */
putFileBeingWritten(final long inodeId, final DFSOutputStream out)837   void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
838     synchronized(filesBeingWritten) {
839       filesBeingWritten.put(inodeId, out);
840       // update the last lease renewal time only when there was no
841       // writes. once there is one write stream open, the lease renewer
842       // thread keeps it updated well with in anyone's expiration time.
843       if (lastLeaseRenewal == 0) {
844         updateLastLeaseRenewal();
845       }
846     }
847   }
848 
849   /** Remove a file. Only called from LeaseRenewer. */
removeFileBeingWritten(final long inodeId)850   void removeFileBeingWritten(final long inodeId) {
851     synchronized(filesBeingWritten) {
852       filesBeingWritten.remove(inodeId);
853       if (filesBeingWritten.isEmpty()) {
854         lastLeaseRenewal = 0;
855       }
856     }
857   }
858 
859   /** Is file-being-written map empty? */
isFilesBeingWrittenEmpty()860   boolean isFilesBeingWrittenEmpty() {
861     synchronized(filesBeingWritten) {
862       return filesBeingWritten.isEmpty();
863     }
864   }
865 
866   /** @return true if the client is running */
isClientRunning()867   boolean isClientRunning() {
868     return clientRunning;
869   }
870 
getLastLeaseRenewal()871   long getLastLeaseRenewal() {
872     return lastLeaseRenewal;
873   }
874 
updateLastLeaseRenewal()875   void updateLastLeaseRenewal() {
876     synchronized(filesBeingWritten) {
877       if (filesBeingWritten.isEmpty()) {
878         return;
879       }
880       lastLeaseRenewal = Time.monotonicNow();
881     }
882   }
883 
884   /**
885    * Renew leases.
886    * @return true if lease was renewed. May return false if this
887    * client has been closed or has no files open.
888    **/
renewLease()889   boolean renewLease() throws IOException {
890     if (clientRunning && !isFilesBeingWrittenEmpty()) {
891       try {
892         namenode.renewLease(clientName);
893         updateLastLeaseRenewal();
894         return true;
895       } catch (IOException e) {
896         // Abort if the lease has already expired.
897         final long elapsed = Time.monotonicNow() - getLastLeaseRenewal();
898         if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) {
899           LOG.warn("Failed to renew lease for " + clientName + " for "
900               + (elapsed/1000) + " seconds (>= hard-limit ="
901               + (HdfsConstants.LEASE_HARDLIMIT_PERIOD/1000) + " seconds.) "
902               + "Closing all files being written ...", e);
903           closeAllFilesBeingWritten(true);
904         } else {
905           // Let the lease renewer handle it and retry.
906           throw e;
907         }
908       }
909     }
910     return false;
911   }
912 
913   /**
914    * Close connections the Namenode.
915    */
closeConnectionToNamenode()916   void closeConnectionToNamenode() {
917     RPC.stopProxy(namenode);
918   }
919 
920   /** Close/abort all files being written. */
closeAllFilesBeingWritten(final boolean abort)921   public void closeAllFilesBeingWritten(final boolean abort) {
922     for(;;) {
923       final long inodeId;
924       final DFSOutputStream out;
925       synchronized(filesBeingWritten) {
926         if (filesBeingWritten.isEmpty()) {
927           return;
928         }
929         inodeId = filesBeingWritten.keySet().iterator().next();
930         out = filesBeingWritten.remove(inodeId);
931       }
932       if (out != null) {
933         try {
934           if (abort) {
935             out.abort();
936           } else {
937             out.close();
938           }
939         } catch(IOException ie) {
940           LOG.error("Failed to " + (abort? "abort": "close") +
941                   " inode " + inodeId, ie);
942         }
943       }
944     }
945   }
946 
947   /**
948    * Close the file system, abandoning all of the leases and files being
949    * created and close connections to the namenode.
950    */
951   @Override
close()952   public synchronized void close() throws IOException {
953     if(clientRunning) {
954       closeAllFilesBeingWritten(false);
955       clientRunning = false;
956       getLeaseRenewer().closeClient(this);
957       // close connections to the namenode
958       closeConnectionToNamenode();
959     }
960   }
961 
962   /**
963    * Close all open streams, abandoning all of the leases and files being
964    * created.
965    * @param abort whether streams should be gracefully closed
966    */
closeOutputStreams(boolean abort)967   public void closeOutputStreams(boolean abort) {
968     if (clientRunning) {
969       closeAllFilesBeingWritten(abort);
970     }
971   }
972 
973   /**
974    * Get the default block size for this cluster
975    * @return the default block size in bytes
976    */
getDefaultBlockSize()977   public long getDefaultBlockSize() {
978     return dfsClientConf.defaultBlockSize;
979   }
980 
981   /**
982    * @see ClientProtocol#getPreferredBlockSize(String)
983    */
getBlockSize(String f)984   public long getBlockSize(String f) throws IOException {
985     TraceScope scope = getPathTraceScope("getBlockSize", f);
986     try {
987       return namenode.getPreferredBlockSize(f);
988     } catch (IOException ie) {
989       LOG.warn("Problem getting block size", ie);
990       throw ie;
991     } finally {
992       scope.close();
993     }
994   }
995 
996   /**
997    * Get server default values for a number of configuration params.
998    * @see ClientProtocol#getServerDefaults()
999    */
getServerDefaults()1000   public FsServerDefaults getServerDefaults() throws IOException {
1001     long now = Time.monotonicNow();
1002     if ((serverDefaults == null) ||
1003         (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) {
1004       serverDefaults = namenode.getServerDefaults();
1005       serverDefaultsLastUpdate = now;
1006     }
1007     assert serverDefaults != null;
1008     return serverDefaults;
1009   }
1010 
1011   /**
1012    * Get a canonical token service name for this client's tokens.  Null should
1013    * be returned if the client is not using tokens.
1014    * @return the token service for the client
1015    */
1016   @InterfaceAudience.LimitedPrivate( { "HDFS" })
getCanonicalServiceName()1017   public String getCanonicalServiceName() {
1018     return (dtService != null) ? dtService.toString() : null;
1019   }
1020 
1021   /**
1022    * @see ClientProtocol#getDelegationToken(Text)
1023    */
getDelegationToken(Text renewer)1024   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
1025       throws IOException {
1026     assert dtService != null;
1027     TraceScope scope = Trace.startSpan("getDelegationToken", traceSampler);
1028     try {
1029       Token<DelegationTokenIdentifier> token =
1030         namenode.getDelegationToken(renewer);
1031       if (token != null) {
1032         token.setService(this.dtService);
1033         LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
1034       } else {
1035         LOG.info("Cannot get delegation token from " + renewer);
1036       }
1037       return token;
1038     } finally {
1039       scope.close();
1040     }
1041   }
1042 
1043   /**
1044    * Renew a delegation token
1045    * @param token the token to renew
1046    * @return the new expiration time
1047    * @throws InvalidToken
1048    * @throws IOException
1049    * @deprecated Use Token.renew instead.
1050    */
1051   @Deprecated
renewDelegationToken(Token<DelegationTokenIdentifier> token)1052   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
1053       throws InvalidToken, IOException {
1054     LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
1055     try {
1056       return token.renew(conf);
1057     } catch (InterruptedException ie) {
1058       throw new RuntimeException("caught interrupted", ie);
1059     } catch (RemoteException re) {
1060       throw re.unwrapRemoteException(InvalidToken.class,
1061                                      AccessControlException.class);
1062     }
1063   }
1064 
1065   private static final Map<String, Boolean> localAddrMap = Collections
1066       .synchronizedMap(new HashMap<String, Boolean>());
1067 
isLocalAddress(InetSocketAddress targetAddr)1068   public static boolean isLocalAddress(InetSocketAddress targetAddr) {
1069     InetAddress addr = targetAddr.getAddress();
1070     Boolean cached = localAddrMap.get(addr.getHostAddress());
1071     if (cached != null) {
1072       if (LOG.isTraceEnabled()) {
1073         LOG.trace("Address " + targetAddr +
1074                   (cached ? " is local" : " is not local"));
1075       }
1076       return cached;
1077     }
1078 
1079     boolean local = NetUtils.isLocalAddress(addr);
1080 
1081     if (LOG.isTraceEnabled()) {
1082       LOG.trace("Address " + targetAddr +
1083                 (local ? " is local" : " is not local"));
1084     }
1085     localAddrMap.put(addr.getHostAddress(), local);
1086     return local;
1087   }
1088 
1089   /**
1090    * Cancel a delegation token
1091    * @param token the token to cancel
1092    * @throws InvalidToken
1093    * @throws IOException
1094    * @deprecated Use Token.cancel instead.
1095    */
1096   @Deprecated
cancelDelegationToken(Token<DelegationTokenIdentifier> token)1097   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
1098       throws InvalidToken, IOException {
1099     LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
1100     try {
1101       token.cancel(conf);
1102      } catch (InterruptedException ie) {
1103       throw new RuntimeException("caught interrupted", ie);
1104     } catch (RemoteException re) {
1105       throw re.unwrapRemoteException(InvalidToken.class,
1106                                      AccessControlException.class);
1107     }
1108   }
1109 
1110   @InterfaceAudience.Private
1111   public static class Renewer extends TokenRenewer {
1112 
1113     static {
1114       //Ensure that HDFS Configuration files are loaded before trying to use
1115       // the renewer.
HdfsConfiguration.init()1116       HdfsConfiguration.init();
1117     }
1118 
1119     @Override
handleKind(Text kind)1120     public boolean handleKind(Text kind) {
1121       return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind);
1122     }
1123 
1124     @SuppressWarnings("unchecked")
1125     @Override
renew(Token<?> token, Configuration conf)1126     public long renew(Token<?> token, Configuration conf) throws IOException {
1127       Token<DelegationTokenIdentifier> delToken =
1128         (Token<DelegationTokenIdentifier>) token;
1129       ClientProtocol nn = getNNProxy(delToken, conf);
1130       try {
1131         return nn.renewDelegationToken(delToken);
1132       } catch (RemoteException re) {
1133         throw re.unwrapRemoteException(InvalidToken.class,
1134                                        AccessControlException.class);
1135       }
1136     }
1137 
1138     @SuppressWarnings("unchecked")
1139     @Override
cancel(Token<?> token, Configuration conf)1140     public void cancel(Token<?> token, Configuration conf) throws IOException {
1141       Token<DelegationTokenIdentifier> delToken =
1142           (Token<DelegationTokenIdentifier>) token;
1143       LOG.info("Cancelling " +
1144                DelegationTokenIdentifier.stringifyToken(delToken));
1145       ClientProtocol nn = getNNProxy(delToken, conf);
1146       try {
1147         nn.cancelDelegationToken(delToken);
1148       } catch (RemoteException re) {
1149         throw re.unwrapRemoteException(InvalidToken.class,
1150             AccessControlException.class);
1151       }
1152     }
1153 
getNNProxy( Token<DelegationTokenIdentifier> token, Configuration conf)1154     private static ClientProtocol getNNProxy(
1155         Token<DelegationTokenIdentifier> token, Configuration conf)
1156         throws IOException {
1157       URI uri = HAUtil.getServiceUriFromToken(HdfsConstants.HDFS_URI_SCHEME,
1158               token);
1159       if (HAUtil.isTokenForLogicalUri(token) &&
1160           !HAUtil.isLogicalUri(conf, uri)) {
1161         // If the token is for a logical nameservice, but the configuration
1162         // we have disagrees about that, we can't actually renew it.
1163         // This can be the case in MR, for example, if the RM doesn't
1164         // have all of the HA clusters configured in its configuration.
1165         throw new IOException("Unable to map logical nameservice URI '" +
1166             uri + "' to a NameNode. Local configuration does not have " +
1167             "a failover proxy provider configured.");
1168       }
1169 
1170       NameNodeProxies.ProxyAndInfo<ClientProtocol> info =
1171         NameNodeProxies.createProxy(conf, uri, ClientProtocol.class);
1172       assert info.getDelegationTokenService().equals(token.getService()) :
1173         "Returned service '" + info.getDelegationTokenService().toString() +
1174         "' doesn't match expected service '" +
1175         token.getService().toString() + "'";
1176 
1177       return info.getProxy();
1178     }
1179 
1180     @Override
isManaged(Token<?> token)1181     public boolean isManaged(Token<?> token) throws IOException {
1182       return true;
1183     }
1184 
1185   }
1186 
1187   /**
1188    * Report corrupt blocks that were discovered by the client.
1189    * @see ClientProtocol#reportBadBlocks(LocatedBlock[])
1190    */
reportBadBlocks(LocatedBlock[] blocks)1191   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
1192     namenode.reportBadBlocks(blocks);
1193   }
1194 
getDefaultReplication()1195   public short getDefaultReplication() {
1196     return dfsClientConf.defaultReplication;
1197   }
1198 
getLocatedBlocks(String src, long start)1199   public LocatedBlocks getLocatedBlocks(String src, long start)
1200       throws IOException {
1201     return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
1202   }
1203 
1204   /*
1205    * This is just a wrapper around callGetBlockLocations, but non-static so that
1206    * we can stub it out for tests.
1207    */
1208   @VisibleForTesting
getLocatedBlocks(String src, long start, long length)1209   public LocatedBlocks getLocatedBlocks(String src, long start, long length)
1210       throws IOException {
1211     TraceScope scope = getPathTraceScope("getBlockLocations", src);
1212     try {
1213       return callGetBlockLocations(namenode, src, start, length);
1214     } finally {
1215       scope.close();
1216     }
1217   }
1218 
1219   /**
1220    * @see ClientProtocol#getBlockLocations(String, long, long)
1221    */
callGetBlockLocations(ClientProtocol namenode, String src, long start, long length)1222   static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
1223       String src, long start, long length)
1224       throws IOException {
1225     try {
1226       return namenode.getBlockLocations(src, start, length);
1227     } catch(RemoteException re) {
1228       throw re.unwrapRemoteException(AccessControlException.class,
1229                                      FileNotFoundException.class,
1230                                      UnresolvedPathException.class);
1231     }
1232   }
1233 
1234   /**
1235    * Recover a file's lease
1236    * @param src a file's path
1237    * @return true if the file is already closed
1238    * @throws IOException
1239    */
recoverLease(String src)1240   boolean recoverLease(String src) throws IOException {
1241     checkOpen();
1242 
1243     TraceScope scope = getPathTraceScope("recoverLease", src);
1244     try {
1245       return namenode.recoverLease(src, clientName);
1246     } catch (RemoteException re) {
1247       throw re.unwrapRemoteException(FileNotFoundException.class,
1248                                      AccessControlException.class,
1249                                      UnresolvedPathException.class);
1250     } finally {
1251       scope.close();
1252     }
1253   }
1254 
1255   /**
1256    * Get block location info about file
1257    *
1258    * getBlockLocations() returns a list of hostnames that store
1259    * data for a specific file region.  It returns a set of hostnames
1260    * for every block within the indicated region.
1261    *
1262    * This function is very useful when writing code that considers
1263    * data-placement when performing operations.  For example, the
1264    * MapReduce system tries to schedule tasks on the same machines
1265    * as the data-block the task processes.
1266    */
getBlockLocations(String src, long start, long length)1267   public BlockLocation[] getBlockLocations(String src, long start,
1268         long length) throws IOException, UnresolvedLinkException {
1269     TraceScope scope = getPathTraceScope("getBlockLocations", src);
1270     try {
1271       LocatedBlocks blocks = getLocatedBlocks(src, start, length);
1272       BlockLocation[] locations =  DFSUtil.locatedBlocks2Locations(blocks);
1273       HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
1274       for (int i = 0; i < locations.length; i++) {
1275         hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
1276       }
1277       return hdfsLocations;
1278     } finally {
1279       scope.close();
1280     }
1281   }
1282 
1283   /**
1284    * Get block location information about a list of {@link HdfsBlockLocation}.
1285    * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
1286    * get {@link BlockStorageLocation}s for blocks returned by
1287    * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
1288    * .
1289    *
1290    * This is done by making a round of RPCs to the associated datanodes, asking
1291    * the volume of each block replica. The returned array of
1292    * {@link BlockStorageLocation} expose this information as a
1293    * {@link VolumeId}.
1294    *
1295    * @param blockLocations
1296    *          target blocks on which to query volume location information
1297    * @return volumeBlockLocations original block array augmented with additional
1298    *         volume location information for each replica.
1299    */
getBlockStorageLocations( List<BlockLocation> blockLocations)1300   public BlockStorageLocation[] getBlockStorageLocations(
1301       List<BlockLocation> blockLocations) throws IOException,
1302       UnsupportedOperationException, InvalidBlockTokenException {
1303     if (!getConf().getHdfsBlocksMetadataEnabled) {
1304       throw new UnsupportedOperationException("Datanode-side support for " +
1305           "getVolumeBlockLocations() must also be enabled in the client " +
1306           "configuration.");
1307     }
1308     // Downcast blockLocations and fetch out required LocatedBlock(s)
1309     List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
1310     for (BlockLocation loc : blockLocations) {
1311       if (!(loc instanceof HdfsBlockLocation)) {
1312         throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
1313             "expected to be passed HdfsBlockLocations");
1314       }
1315       HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
1316       blocks.add(hdfsLoc.getLocatedBlock());
1317     }
1318 
1319     // Re-group the LocatedBlocks to be grouped by datanodes, with the values
1320     // a list of the LocatedBlocks on the datanode.
1321     Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks =
1322         new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
1323     for (LocatedBlock b : blocks) {
1324       for (DatanodeInfo info : b.getLocations()) {
1325         if (!datanodeBlocks.containsKey(info)) {
1326           datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
1327         }
1328         List<LocatedBlock> l = datanodeBlocks.get(info);
1329         l.add(b);
1330       }
1331     }
1332 
1333     // Make RPCs to the datanodes to get volume locations for its replicas
1334     TraceScope scope =
1335       Trace.startSpan("getBlockStorageLocations", traceSampler);
1336     Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
1337     try {
1338       metadatas = BlockStorageLocationUtil.
1339           queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
1340               getConf().getFileBlockStorageLocationsNumThreads,
1341               getConf().getFileBlockStorageLocationsTimeoutMs,
1342               getConf().connectToDnViaHostname);
1343       if (LOG.isTraceEnabled()) {
1344         LOG.trace("metadata returned: "
1345             + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
1346       }
1347     } finally {
1348       scope.close();
1349     }
1350 
1351     // Regroup the returned VolumeId metadata to again be grouped by
1352     // LocatedBlock rather than by datanode
1353     Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
1354         .associateVolumeIdsWithBlocks(blocks, metadatas);
1355 
1356     // Combine original BlockLocations with new VolumeId information
1357     BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
1358         .convertToVolumeBlockLocations(blocks, blockVolumeIds);
1359 
1360     return volumeBlockLocations;
1361   }
1362 
1363   /**
1364    * Decrypts a EDEK by consulting the KeyProvider.
1365    */
decryptEncryptedDataEncryptionKey(FileEncryptionInfo feInfo)1366   private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
1367       feInfo) throws IOException {
1368     TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler);
1369     try {
1370       KeyProvider provider = getKeyProvider();
1371       if (provider == null) {
1372         throw new IOException("No KeyProvider is configured, cannot access" +
1373             " an encrypted file");
1374       }
1375       EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
1376           feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
1377           feInfo.getEncryptedDataEncryptionKey());
1378       try {
1379         KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
1380             .createKeyProviderCryptoExtension(provider);
1381         return cryptoProvider.decryptEncryptedKey(ekv);
1382       } catch (GeneralSecurityException e) {
1383         throw new IOException(e);
1384       }
1385     } finally {
1386       scope.close();
1387     }
1388   }
1389 
1390   /**
1391    * Obtain the crypto protocol version from the provided FileEncryptionInfo,
1392    * checking to see if this version is supported by.
1393    *
1394    * @param feInfo FileEncryptionInfo
1395    * @return CryptoProtocolVersion from the feInfo
1396    * @throws IOException if the protocol version is unsupported.
1397    */
getCryptoProtocolVersion(FileEncryptionInfo feInfo)1398   private static CryptoProtocolVersion getCryptoProtocolVersion
1399       (FileEncryptionInfo feInfo) throws IOException {
1400     final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
1401     if (!CryptoProtocolVersion.supports(version)) {
1402       throw new IOException("Client does not support specified " +
1403           "CryptoProtocolVersion " + version.getDescription() + " version " +
1404           "number" + version.getVersion());
1405     }
1406     return version;
1407   }
1408 
1409   /**
1410    * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo
1411    * and the available CryptoCodecs configured in the Configuration.
1412    *
1413    * @param conf   Configuration
1414    * @param feInfo FileEncryptionInfo
1415    * @return CryptoCodec
1416    * @throws IOException if no suitable CryptoCodec for the CipherSuite is
1417    *                     available.
1418    */
getCryptoCodec(Configuration conf, FileEncryptionInfo feInfo)1419   private static CryptoCodec getCryptoCodec(Configuration conf,
1420       FileEncryptionInfo feInfo) throws IOException {
1421     final CipherSuite suite = feInfo.getCipherSuite();
1422     if (suite.equals(CipherSuite.UNKNOWN)) {
1423       throw new IOException("NameNode specified unknown CipherSuite with ID "
1424           + suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
1425     }
1426     final CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
1427     if (codec == null) {
1428       throw new UnknownCipherSuiteException(
1429           "No configuration found for the cipher suite "
1430           + suite.getConfigSuffix() + " prefixed with "
1431           + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
1432           + ". Please see the example configuration "
1433           + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
1434           + "at core-default.xml for details.");
1435     }
1436     return codec;
1437   }
1438 
1439   /**
1440    * Wraps the stream in a CryptoInputStream if the underlying file is
1441    * encrypted.
1442    */
createWrappedInputStream(DFSInputStream dfsis)1443   public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
1444       throws IOException {
1445     final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
1446     if (feInfo != null) {
1447       // File is encrypted, wrap the stream in a crypto stream.
1448       // Currently only one version, so no special logic based on the version #
1449       getCryptoProtocolVersion(feInfo);
1450       final CryptoCodec codec = getCryptoCodec(conf, feInfo);
1451       final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
1452       final CryptoInputStream cryptoIn =
1453           new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
1454               feInfo.getIV());
1455       return new HdfsDataInputStream(cryptoIn);
1456     } else {
1457       // No FileEncryptionInfo so no encryption.
1458       return new HdfsDataInputStream(dfsis);
1459     }
1460   }
1461 
1462   /**
1463    * Wraps the stream in a CryptoOutputStream if the underlying file is
1464    * encrypted.
1465    */
createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics)1466   public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
1467       FileSystem.Statistics statistics) throws IOException {
1468     return createWrappedOutputStream(dfsos, statistics, 0);
1469   }
1470 
1471   /**
1472    * Wraps the stream in a CryptoOutputStream if the underlying file is
1473    * encrypted.
1474    */
createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics, long startPos)1475   public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
1476       FileSystem.Statistics statistics, long startPos) throws IOException {
1477     final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
1478     if (feInfo != null) {
1479       // File is encrypted, wrap the stream in a crypto stream.
1480       // Currently only one version, so no special logic based on the version #
1481       getCryptoProtocolVersion(feInfo);
1482       final CryptoCodec codec = getCryptoCodec(conf, feInfo);
1483       KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
1484       final CryptoOutputStream cryptoOut =
1485           new CryptoOutputStream(dfsos, codec,
1486               decrypted.getMaterial(), feInfo.getIV(), startPos);
1487       return new HdfsDataOutputStream(cryptoOut, statistics, startPos);
1488     } else {
1489       // No FileEncryptionInfo present so no encryption.
1490       return new HdfsDataOutputStream(dfsos, statistics, startPos);
1491     }
1492   }
1493 
open(String src)1494   public DFSInputStream open(String src)
1495       throws IOException, UnresolvedLinkException {
1496     return open(src, dfsClientConf.ioBufferSize, true, null);
1497   }
1498 
1499   /**
1500    * Create an input stream that obtains a nodelist from the
1501    * namenode, and then reads from all the right places.  Creates
1502    * inner subclass of InputStream that does the right out-of-band
1503    * work.
1504    * @deprecated Use {@link #open(String, int, boolean)} instead.
1505    */
1506   @Deprecated
open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats)1507   public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
1508                              FileSystem.Statistics stats)
1509       throws IOException, UnresolvedLinkException {
1510     return open(src, buffersize, verifyChecksum);
1511   }
1512 
1513 
1514   /**
1515    * Create an input stream that obtains a nodelist from the
1516    * namenode, and then reads from all the right places.  Creates
1517    * inner subclass of InputStream that does the right out-of-band
1518    * work.
1519    */
open(String src, int buffersize, boolean verifyChecksum)1520   public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
1521       throws IOException, UnresolvedLinkException {
1522     checkOpen();
1523     //    Get block info from namenode
1524     TraceScope scope = getPathTraceScope("newDFSInputStream", src);
1525     try {
1526       return new DFSInputStream(this, src, verifyChecksum);
1527     } finally {
1528       scope.close();
1529     }
1530   }
1531 
1532   /**
1533    * Get the namenode associated with this DFSClient object
1534    * @return the namenode associated with this DFSClient object
1535    */
getNamenode()1536   public ClientProtocol getNamenode() {
1537     return namenode;
1538   }
1539 
1540   /**
1541    * Call {@link #create(String, boolean, short, long, Progressable)} with
1542    * default <code>replication</code> and <code>blockSize<code> and null <code>
1543    * progress</code>.
1544    */
create(String src, boolean overwrite)1545   public OutputStream create(String src, boolean overwrite)
1546       throws IOException {
1547     return create(src, overwrite, dfsClientConf.defaultReplication,
1548         dfsClientConf.defaultBlockSize, null);
1549   }
1550 
1551   /**
1552    * Call {@link #create(String, boolean, short, long, Progressable)} with
1553    * default <code>replication</code> and <code>blockSize<code>.
1554    */
create(String src, boolean overwrite, Progressable progress)1555   public OutputStream create(String src,
1556                              boolean overwrite,
1557                              Progressable progress) throws IOException {
1558     return create(src, overwrite, dfsClientConf.defaultReplication,
1559         dfsClientConf.defaultBlockSize, progress);
1560   }
1561 
1562   /**
1563    * Call {@link #create(String, boolean, short, long, Progressable)} with
1564    * null <code>progress</code>.
1565    */
create(String src, boolean overwrite, short replication, long blockSize)1566   public OutputStream create(String src,
1567                              boolean overwrite,
1568                              short replication,
1569                              long blockSize) throws IOException {
1570     return create(src, overwrite, replication, blockSize, null);
1571   }
1572 
1573   /**
1574    * Call {@link #create(String, boolean, short, long, Progressable, int)}
1575    * with default bufferSize.
1576    */
create(String src, boolean overwrite, short replication, long blockSize, Progressable progress)1577   public OutputStream create(String src, boolean overwrite, short replication,
1578       long blockSize, Progressable progress) throws IOException {
1579     return create(src, overwrite, replication, blockSize, progress,
1580         dfsClientConf.ioBufferSize);
1581   }
1582 
1583   /**
1584    * Call {@link #create(String, FsPermission, EnumSet, short, long,
1585    * Progressable, int, ChecksumOpt)} with default <code>permission</code>
1586    * {@link FsPermission#getFileDefault()}.
1587    *
1588    * @param src File name
1589    * @param overwrite overwrite an existing file if true
1590    * @param replication replication factor for the file
1591    * @param blockSize maximum block size
1592    * @param progress interface for reporting client progress
1593    * @param buffersize underlying buffersize
1594    *
1595    * @return output stream
1596    */
create(String src, boolean overwrite, short replication, long blockSize, Progressable progress, int buffersize)1597   public OutputStream create(String src,
1598                              boolean overwrite,
1599                              short replication,
1600                              long blockSize,
1601                              Progressable progress,
1602                              int buffersize)
1603       throws IOException {
1604     return create(src, FsPermission.getFileDefault(),
1605         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1606             : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
1607         buffersize, null);
1608   }
1609 
1610   /**
1611    * Call {@link #create(String, FsPermission, EnumSet, boolean, short,
1612    * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
1613    *  set to true.
1614    */
create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt)1615   public DFSOutputStream create(String src,
1616                              FsPermission permission,
1617                              EnumSet<CreateFlag> flag,
1618                              short replication,
1619                              long blockSize,
1620                              Progressable progress,
1621                              int buffersize,
1622                              ChecksumOpt checksumOpt)
1623       throws IOException {
1624     return create(src, permission, flag, true,
1625         replication, blockSize, progress, buffersize, checksumOpt, null);
1626   }
1627 
1628   /**
1629    * Create a new dfs file with the specified block replication
1630    * with write-progress reporting and return an output stream for writing
1631    * into the file.
1632    *
1633    * @param src File name
1634    * @param permission The permission of the directory being created.
1635    *          If null, use default permission {@link FsPermission#getFileDefault()}
1636    * @param flag indicates create a new file or create/overwrite an
1637    *          existing file or append to an existing file
1638    * @param createParent create missing parent directory if true
1639    * @param replication block replication
1640    * @param blockSize maximum block size
1641    * @param progress interface for reporting client progress
1642    * @param buffersize underlying buffer size
1643    * @param checksumOpt checksum options
1644    *
1645    * @return output stream
1646    *
1647    * @see ClientProtocol#create for detailed description of exceptions thrown
1648    */
create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt)1649   public DFSOutputStream create(String src,
1650                              FsPermission permission,
1651                              EnumSet<CreateFlag> flag,
1652                              boolean createParent,
1653                              short replication,
1654                              long blockSize,
1655                              Progressable progress,
1656                              int buffersize,
1657                              ChecksumOpt checksumOpt) throws IOException {
1658     return create(src, permission, flag, createParent, replication, blockSize,
1659         progress, buffersize, checksumOpt, null);
1660   }
1661 
1662   /**
1663    * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
1664    * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
1665    * a hint to where the namenode should place the file blocks.
1666    * The favored nodes hint is not persisted in HDFS. Hence it may be honored
1667    * at the creation time only. HDFS could move the blocks during balancing or
1668    * replication, to move the blocks from favored nodes. A value of null means
1669    * no favored nodes for this create
1670    */
create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)1671   public DFSOutputStream create(String src,
1672                              FsPermission permission,
1673                              EnumSet<CreateFlag> flag,
1674                              boolean createParent,
1675                              short replication,
1676                              long blockSize,
1677                              Progressable progress,
1678                              int buffersize,
1679                              ChecksumOpt checksumOpt,
1680                              InetSocketAddress[] favoredNodes) throws IOException {
1681     checkOpen();
1682     if (permission == null) {
1683       permission = FsPermission.getFileDefault();
1684     }
1685     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
1686     if(LOG.isDebugEnabled()) {
1687       LOG.debug(src + ": masked=" + masked);
1688     }
1689     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
1690         src, masked, flag, createParent, replication, blockSize, progress,
1691         buffersize, dfsClientConf.createChecksum(checksumOpt),
1692         getFavoredNodesStr(favoredNodes));
1693     beginFileLease(result.getFileId(), result);
1694     return result;
1695   }
1696 
getFavoredNodesStr(InetSocketAddress[] favoredNodes)1697   private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) {
1698     String[] favoredNodeStrs = null;
1699     if (favoredNodes != null) {
1700       favoredNodeStrs = new String[favoredNodes.length];
1701       for (int i = 0; i < favoredNodes.length; i++) {
1702         favoredNodeStrs[i] =
1703             favoredNodes[i].getHostName() + ":"
1704                          + favoredNodes[i].getPort();
1705       }
1706     }
1707     return favoredNodeStrs;
1708   }
1709 
1710   /**
1711    * Append to an existing file if {@link CreateFlag#APPEND} is present
1712    */
primitiveAppend(String src, EnumSet<CreateFlag> flag, int buffersize, Progressable progress)1713   private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
1714       int buffersize, Progressable progress) throws IOException {
1715     if (flag.contains(CreateFlag.APPEND)) {
1716       HdfsFileStatus stat = getFileInfo(src);
1717       if (stat == null) { // No file to append to
1718         // New file needs to be created if create option is present
1719         if (!flag.contains(CreateFlag.CREATE)) {
1720           throw new FileNotFoundException("failed to append to non-existent file "
1721               + src + " on client " + clientName);
1722         }
1723         return null;
1724       }
1725       return callAppend(src, buffersize, flag, progress, null);
1726     }
1727     return null;
1728   }
1729 
1730   /**
1731    * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
1732    *  Progressable, int, ChecksumOpt)} except that the permission
1733    *  is absolute (ie has already been masked with umask.
1734    */
primitiveCreate(String src, FsPermission absPermission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt)1735   public DFSOutputStream primitiveCreate(String src,
1736                              FsPermission absPermission,
1737                              EnumSet<CreateFlag> flag,
1738                              boolean createParent,
1739                              short replication,
1740                              long blockSize,
1741                              Progressable progress,
1742                              int buffersize,
1743                              ChecksumOpt checksumOpt)
1744       throws IOException, UnresolvedLinkException {
1745     checkOpen();
1746     CreateFlag.validate(flag);
1747     DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
1748     if (result == null) {
1749       DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
1750       result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
1751           flag, createParent, replication, blockSize, progress, buffersize,
1752           checksum, null);
1753     }
1754     beginFileLease(result.getFileId(), result);
1755     return result;
1756   }
1757 
1758   /**
1759    * Creates a symbolic link.
1760    *
1761    * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean)
1762    */
createSymlink(String target, String link, boolean createParent)1763   public void createSymlink(String target, String link, boolean createParent)
1764       throws IOException {
1765     TraceScope scope = getPathTraceScope("createSymlink", target);
1766     try {
1767       FsPermission dirPerm =
1768           FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
1769       namenode.createSymlink(target, link, dirPerm, createParent);
1770     } catch (RemoteException re) {
1771       throw re.unwrapRemoteException(AccessControlException.class,
1772                                      FileAlreadyExistsException.class,
1773                                      FileNotFoundException.class,
1774                                      ParentNotDirectoryException.class,
1775                                      NSQuotaExceededException.class,
1776                                      DSQuotaExceededException.class,
1777                                      UnresolvedPathException.class,
1778                                      SnapshotAccessControlException.class);
1779     } finally {
1780       scope.close();
1781     }
1782   }
1783 
1784   /**
1785    * Resolve the *first* symlink, if any, in the path.
1786    *
1787    * @see ClientProtocol#getLinkTarget(String)
1788    */
getLinkTarget(String path)1789   public String getLinkTarget(String path) throws IOException {
1790     checkOpen();
1791     TraceScope scope = getPathTraceScope("getLinkTarget", path);
1792     try {
1793       return namenode.getLinkTarget(path);
1794     } catch (RemoteException re) {
1795       throw re.unwrapRemoteException(AccessControlException.class,
1796                                      FileNotFoundException.class);
1797     } finally {
1798       scope.close();
1799     }
1800   }
1801 
1802   /** Method to get stream returned by append call */
callAppend(String src, int buffersize, EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)1803   private DFSOutputStream callAppend(String src, int buffersize,
1804       EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)
1805       throws IOException {
1806     CreateFlag.validateForAppend(flag);
1807     try {
1808       LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
1809           new EnumSetWritable<>(flag, CreateFlag.class));
1810       HdfsFileStatus status = blkWithStatus.getFileStatus();
1811       if (status == null) {
1812         DFSClient.LOG.debug("NameNode is on an older version, request file " +
1813             "info with additional RPC call for file: " + src);
1814         status = getFileInfo(src);
1815       }
1816       return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
1817           progress, blkWithStatus.getLastBlock(),
1818           status, dfsClientConf.createChecksum(),
1819           favoredNodes);
1820     } catch(RemoteException re) {
1821       throw re.unwrapRemoteException(AccessControlException.class,
1822                                      FileNotFoundException.class,
1823                                      SafeModeException.class,
1824                                      DSQuotaExceededException.class,
1825                                      UnsupportedOperationException.class,
1826                                      UnresolvedPathException.class,
1827                                      SnapshotAccessControlException.class);
1828     }
1829   }
1830 
1831   /**
1832    * Append to an existing HDFS file.
1833    *
1834    * @param src file name
1835    * @param buffersize buffer size
1836    * @param flag indicates whether to append data to a new block instead of
1837    *             the last block
1838    * @param progress for reporting write-progress; null is acceptable.
1839    * @param statistics file system statistics; null is acceptable.
1840    * @return an output stream for writing into the file
1841    *
1842    * @see ClientProtocol#append(String, String, EnumSetWritable)
1843    */
append(final String src, final int buffersize, EnumSet<CreateFlag> flag, final Progressable progress, final FileSystem.Statistics statistics)1844   public HdfsDataOutputStream append(final String src, final int buffersize,
1845       EnumSet<CreateFlag> flag, final Progressable progress,
1846       final FileSystem.Statistics statistics) throws IOException {
1847     final DFSOutputStream out = append(src, buffersize, flag, null, progress);
1848     return createWrappedOutputStream(out, statistics, out.getInitialLen());
1849   }
1850 
1851   /**
1852    * Append to an existing HDFS file.
1853    *
1854    * @param src file name
1855    * @param buffersize buffer size
1856    * @param flag indicates whether to append data to a new block instead of the
1857    *          last block
1858    * @param progress for reporting write-progress; null is acceptable.
1859    * @param statistics file system statistics; null is acceptable.
1860    * @param favoredNodes FavoredNodes for new blocks
1861    * @return an output stream for writing into the file
1862    * @see ClientProtocol#append(String, String, EnumSetWritable)
1863    */
append(final String src, final int buffersize, EnumSet<CreateFlag> flag, final Progressable progress, final FileSystem.Statistics statistics, final InetSocketAddress[] favoredNodes)1864   public HdfsDataOutputStream append(final String src, final int buffersize,
1865       EnumSet<CreateFlag> flag, final Progressable progress,
1866       final FileSystem.Statistics statistics,
1867       final InetSocketAddress[] favoredNodes) throws IOException {
1868     final DFSOutputStream out = append(src, buffersize, flag,
1869         getFavoredNodesStr(favoredNodes), progress);
1870     return createWrappedOutputStream(out, statistics, out.getInitialLen());
1871   }
1872 
append(String src, int buffersize, EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)1873   private DFSOutputStream append(String src, int buffersize,
1874       EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)
1875       throws IOException {
1876     checkOpen();
1877     final DFSOutputStream result = callAppend(src, buffersize, flag, progress,
1878         favoredNodes);
1879     beginFileLease(result.getFileId(), result);
1880     return result;
1881   }
1882 
1883   /**
1884    * Set replication for an existing file.
1885    * @param src file name
1886    * @param replication replication to set the file to
1887    *
1888    * @see ClientProtocol#setReplication(String, short)
1889    */
setReplication(String src, short replication)1890   public boolean setReplication(String src, short replication)
1891       throws IOException {
1892     TraceScope scope = getPathTraceScope("setReplication", src);
1893     try {
1894       return namenode.setReplication(src, replication);
1895     } catch(RemoteException re) {
1896       throw re.unwrapRemoteException(AccessControlException.class,
1897                                      FileNotFoundException.class,
1898                                      SafeModeException.class,
1899                                      DSQuotaExceededException.class,
1900                                      UnresolvedPathException.class,
1901                                      SnapshotAccessControlException.class);
1902     } finally {
1903       scope.close();
1904     }
1905   }
1906 
1907   /**
1908    * Set storage policy for an existing file/directory
1909    * @param src file/directory name
1910    * @param policyName name of the storage policy
1911    */
setStoragePolicy(String src, String policyName)1912   public void setStoragePolicy(String src, String policyName)
1913       throws IOException {
1914     TraceScope scope = getPathTraceScope("setStoragePolicy", src);
1915     try {
1916       namenode.setStoragePolicy(src, policyName);
1917     } catch (RemoteException e) {
1918       throw e.unwrapRemoteException(AccessControlException.class,
1919                                     FileNotFoundException.class,
1920                                     SafeModeException.class,
1921                                     NSQuotaExceededException.class,
1922                                     UnresolvedPathException.class,
1923                                     SnapshotAccessControlException.class);
1924     } finally {
1925       scope.close();
1926     }
1927   }
1928 
1929   /**
1930    * @return All the existing storage policies
1931    */
getStoragePolicies()1932   public BlockStoragePolicy[] getStoragePolicies() throws IOException {
1933     TraceScope scope = Trace.startSpan("getStoragePolicies", traceSampler);
1934     try {
1935       return namenode.getStoragePolicies();
1936     } finally {
1937       scope.close();
1938     }
1939   }
1940 
1941   /**
1942    * Rename file or directory.
1943    * @see ClientProtocol#rename(String, String)
1944    * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
1945    */
1946   @Deprecated
rename(String src, String dst)1947   public boolean rename(String src, String dst) throws IOException {
1948     checkOpen();
1949     TraceScope scope = getSrcDstTraceScope("rename", src, dst);
1950     try {
1951       return namenode.rename(src, dst);
1952     } catch(RemoteException re) {
1953       throw re.unwrapRemoteException(AccessControlException.class,
1954                                      NSQuotaExceededException.class,
1955                                      DSQuotaExceededException.class,
1956                                      UnresolvedPathException.class,
1957                                      SnapshotAccessControlException.class);
1958     } finally {
1959       scope.close();
1960     }
1961   }
1962 
1963   /**
1964    * Move blocks from src to trg and delete src
1965    * See {@link ClientProtocol#concat}.
1966    */
concat(String trg, String [] srcs)1967   public void concat(String trg, String [] srcs) throws IOException {
1968     checkOpen();
1969     TraceScope scope = Trace.startSpan("concat", traceSampler);
1970     try {
1971       namenode.concat(trg, srcs);
1972     } catch(RemoteException re) {
1973       throw re.unwrapRemoteException(AccessControlException.class,
1974                                      UnresolvedPathException.class,
1975                                      SnapshotAccessControlException.class);
1976     } finally {
1977       scope.close();
1978     }
1979   }
1980   /**
1981    * Rename file or directory.
1982    * @see ClientProtocol#rename2(String, String, Options.Rename...)
1983    */
rename(String src, String dst, Options.Rename... options)1984   public void rename(String src, String dst, Options.Rename... options)
1985       throws IOException {
1986     checkOpen();
1987     TraceScope scope = getSrcDstTraceScope("rename2", src, dst);
1988     try {
1989       namenode.rename2(src, dst, options);
1990     } catch(RemoteException re) {
1991       throw re.unwrapRemoteException(AccessControlException.class,
1992                                      DSQuotaExceededException.class,
1993                                      FileAlreadyExistsException.class,
1994                                      FileNotFoundException.class,
1995                                      ParentNotDirectoryException.class,
1996                                      SafeModeException.class,
1997                                      NSQuotaExceededException.class,
1998                                      UnresolvedPathException.class,
1999                                      SnapshotAccessControlException.class);
2000     } finally {
2001       scope.close();
2002     }
2003   }
2004 
2005   /**
2006    * Truncate a file to an indicated size
2007    * See {@link ClientProtocol#truncate}.
2008    */
truncate(String src, long newLength)2009   public boolean truncate(String src, long newLength) throws IOException {
2010     checkOpen();
2011     if (newLength < 0) {
2012       throw new HadoopIllegalArgumentException(
2013           "Cannot truncate to a negative file size: " + newLength + ".");
2014     }
2015     try {
2016       return namenode.truncate(src, newLength, clientName);
2017     } catch (RemoteException re) {
2018       throw re.unwrapRemoteException(AccessControlException.class,
2019           UnresolvedPathException.class);
2020     }
2021   }
2022 
2023   /**
2024    * Delete file or directory.
2025    * See {@link ClientProtocol#delete(String, boolean)}.
2026    */
2027   @Deprecated
delete(String src)2028   public boolean delete(String src) throws IOException {
2029     checkOpen();
2030     return delete(src, true);
2031   }
2032 
2033   /**
2034    * delete file or directory.
2035    * delete contents of the directory if non empty and recursive
2036    * set to true
2037    *
2038    * @see ClientProtocol#delete(String, boolean)
2039    */
delete(String src, boolean recursive)2040   public boolean delete(String src, boolean recursive) throws IOException {
2041     checkOpen();
2042     TraceScope scope = getPathTraceScope("delete", src);
2043     try {
2044       return namenode.delete(src, recursive);
2045     } catch(RemoteException re) {
2046       throw re.unwrapRemoteException(AccessControlException.class,
2047                                      FileNotFoundException.class,
2048                                      SafeModeException.class,
2049                                      UnresolvedPathException.class,
2050                                      SnapshotAccessControlException.class);
2051     } finally {
2052       scope.close();
2053     }
2054   }
2055 
2056   /** Implemented using getFileInfo(src)
2057    */
exists(String src)2058   public boolean exists(String src) throws IOException {
2059     checkOpen();
2060     return getFileInfo(src) != null;
2061   }
2062 
2063   /**
2064    * Get a partial listing of the indicated directory
2065    * No block locations need to be fetched
2066    */
listPaths(String src, byte[] startAfter)2067   public DirectoryListing listPaths(String src,  byte[] startAfter)
2068     throws IOException {
2069     return listPaths(src, startAfter, false);
2070   }
2071 
2072   /**
2073    * Get a partial listing of the indicated directory
2074    *
2075    * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter
2076    * if the application wants to fetch a listing starting from
2077    * the first entry in the directory
2078    *
2079    * @see ClientProtocol#getListing(String, byte[], boolean)
2080    */
listPaths(String src, byte[] startAfter, boolean needLocation)2081   public DirectoryListing listPaths(String src,  byte[] startAfter,
2082       boolean needLocation) throws IOException {
2083     checkOpen();
2084     TraceScope scope = getPathTraceScope("listPaths", src);
2085     try {
2086       return namenode.getListing(src, startAfter, needLocation);
2087     } catch(RemoteException re) {
2088       throw re.unwrapRemoteException(AccessControlException.class,
2089                                      FileNotFoundException.class,
2090                                      UnresolvedPathException.class);
2091     } finally {
2092       scope.close();
2093     }
2094   }
2095 
2096   /**
2097    * Get the file info for a specific file or directory.
2098    * @param src The string representation of the path to the file
2099    * @return object containing information regarding the file
2100    *         or null if file not found
2101    *
2102    * @see ClientProtocol#getFileInfo(String) for description of exceptions
2103    */
getFileInfo(String src)2104   public HdfsFileStatus getFileInfo(String src) throws IOException {
2105     checkOpen();
2106     TraceScope scope = getPathTraceScope("getFileInfo", src);
2107     try {
2108       return namenode.getFileInfo(src);
2109     } catch(RemoteException re) {
2110       throw re.unwrapRemoteException(AccessControlException.class,
2111                                      FileNotFoundException.class,
2112                                      UnresolvedPathException.class);
2113     } finally {
2114       scope.close();
2115     }
2116   }
2117 
2118   /**
2119    * Close status of a file
2120    * @return true if file is already closed
2121    */
isFileClosed(String src)2122   public boolean isFileClosed(String src) throws IOException{
2123     checkOpen();
2124     TraceScope scope = getPathTraceScope("isFileClosed", src);
2125     try {
2126       return namenode.isFileClosed(src);
2127     } catch(RemoteException re) {
2128       throw re.unwrapRemoteException(AccessControlException.class,
2129                                      FileNotFoundException.class,
2130                                      UnresolvedPathException.class);
2131     } finally {
2132       scope.close();
2133     }
2134   }
2135 
2136   /**
2137    * Get the file info for a specific file or directory. If src
2138    * refers to a symlink then the FileStatus of the link is returned.
2139    * @param src path to a file or directory.
2140    *
2141    * For description of exceptions thrown
2142    * @see ClientProtocol#getFileLinkInfo(String)
2143    */
getFileLinkInfo(String src)2144   public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
2145     checkOpen();
2146     TraceScope scope = getPathTraceScope("getFileLinkInfo", src);
2147     try {
2148       return namenode.getFileLinkInfo(src);
2149     } catch(RemoteException re) {
2150       throw re.unwrapRemoteException(AccessControlException.class,
2151                                      UnresolvedPathException.class);
2152     } finally {
2153       scope.close();
2154     }
2155    }
2156 
2157   @InterfaceAudience.Private
clearDataEncryptionKey()2158   public void clearDataEncryptionKey() {
2159     LOG.debug("Clearing encryption key");
2160     synchronized (this) {
2161       encryptionKey = null;
2162     }
2163   }
2164 
2165   /**
2166    * @return true if data sent between this client and DNs should be encrypted,
2167    *         false otherwise.
2168    * @throws IOException in the event of error communicating with the NN
2169    */
shouldEncryptData()2170   boolean shouldEncryptData() throws IOException {
2171     FsServerDefaults d = getServerDefaults();
2172     return d == null ? false : d.getEncryptDataTransfer();
2173   }
2174 
2175   @Override
newDataEncryptionKey()2176   public DataEncryptionKey newDataEncryptionKey() throws IOException {
2177     if (shouldEncryptData()) {
2178       synchronized (this) {
2179         if (encryptionKey == null ||
2180             encryptionKey.expiryDate < Time.now()) {
2181           LOG.debug("Getting new encryption token from NN");
2182           encryptionKey = namenode.getDataEncryptionKey();
2183         }
2184         return encryptionKey;
2185       }
2186     } else {
2187       return null;
2188     }
2189   }
2190 
2191   /**
2192    * Get the checksum of the whole file of a range of the file. Note that the
2193    * range always starts from the beginning of the file.
2194    * @param src The file path
2195    * @param length the length of the range, i.e., the range is [0, length]
2196    * @return The checksum
2197    * @see DistributedFileSystem#getFileChecksum(Path)
2198    */
getFileChecksum(String src, long length)2199   public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
2200       throws IOException {
2201     checkOpen();
2202     Preconditions.checkArgument(length >= 0);
2203     //get block locations for the file range
2204     LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
2205         length);
2206     if (null == blockLocations) {
2207       throw new FileNotFoundException("File does not exist: " + src);
2208     }
2209     List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
2210     final DataOutputBuffer md5out = new DataOutputBuffer();
2211     int bytesPerCRC = -1;
2212     DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
2213     long crcPerBlock = 0;
2214     boolean refetchBlocks = false;
2215     int lastRetriedIndex = -1;
2216 
2217     // get block checksum for each block
2218     long remaining = length;
2219     if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
2220       remaining = Math.min(length, blockLocations.getFileLength());
2221     }
2222     for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
2223       if (refetchBlocks) {  // refetch to get fresh tokens
2224         blockLocations = callGetBlockLocations(namenode, src, 0, length);
2225         if (null == blockLocations) {
2226           throw new FileNotFoundException("File does not exist: " + src);
2227         }
2228         locatedblocks = blockLocations.getLocatedBlocks();
2229         refetchBlocks = false;
2230       }
2231       LocatedBlock lb = locatedblocks.get(i);
2232       final ExtendedBlock block = lb.getBlock();
2233       if (remaining < block.getNumBytes()) {
2234         block.setNumBytes(remaining);
2235       }
2236       remaining -= block.getNumBytes();
2237       final DatanodeInfo[] datanodes = lb.getLocations();
2238 
2239       //try each datanode location of the block
2240       final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
2241       boolean done = false;
2242       for(int j = 0; !done && j < datanodes.length; j++) {
2243         DataOutputStream out = null;
2244         DataInputStream in = null;
2245 
2246         try {
2247           //connect to a datanode
2248           IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
2249           out = new DataOutputStream(new BufferedOutputStream(pair.out,
2250               HdfsConstants.SMALL_BUFFER_SIZE));
2251           in = new DataInputStream(pair.in);
2252 
2253           if (LOG.isDebugEnabled()) {
2254             LOG.debug("write to " + datanodes[j] + ": "
2255                 + Op.BLOCK_CHECKSUM + ", block=" + block);
2256           }
2257           // get block MD5
2258           new Sender(out).blockChecksum(block, lb.getBlockToken());
2259 
2260           final BlockOpResponseProto reply =
2261             BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
2262 
2263           String logInfo = "for block " + block + " from datanode " + datanodes[j];
2264           DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
2265 
2266           OpBlockChecksumResponseProto checksumData =
2267             reply.getChecksumResponse();
2268 
2269           //read byte-per-checksum
2270           final int bpc = checksumData.getBytesPerCrc();
2271           if (i == 0) { //first block
2272             bytesPerCRC = bpc;
2273           }
2274           else if (bpc != bytesPerCRC) {
2275             throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
2276                 + " but bytesPerCRC=" + bytesPerCRC);
2277           }
2278 
2279           //read crc-per-block
2280           final long cpb = checksumData.getCrcPerBlock();
2281           if (locatedblocks.size() > 1 && i == 0) {
2282             crcPerBlock = cpb;
2283           }
2284 
2285           //read md5
2286           final MD5Hash md5 = new MD5Hash(
2287               checksumData.getMd5().toByteArray());
2288           md5.write(md5out);
2289 
2290           // read crc-type
2291           final DataChecksum.Type ct;
2292           if (checksumData.hasCrcType()) {
2293             ct = PBHelper.convert(checksumData
2294                 .getCrcType());
2295           } else {
2296             LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
2297                       "inferring checksum by reading first byte");
2298             ct = inferChecksumTypeByReading(lb, datanodes[j]);
2299           }
2300 
2301           if (i == 0) { // first block
2302             crcType = ct;
2303           } else if (crcType != DataChecksum.Type.MIXED
2304               && crcType != ct) {
2305             // if crc types are mixed in a file
2306             crcType = DataChecksum.Type.MIXED;
2307           }
2308 
2309           done = true;
2310 
2311           if (LOG.isDebugEnabled()) {
2312             if (i == 0) {
2313               LOG.debug("set bytesPerCRC=" + bytesPerCRC
2314                   + ", crcPerBlock=" + crcPerBlock);
2315             }
2316             LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
2317           }
2318         } catch (InvalidBlockTokenException ibte) {
2319           if (i > lastRetriedIndex) {
2320             if (LOG.isDebugEnabled()) {
2321               LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
2322                   + "for file " + src + " for block " + block
2323                   + " from datanode " + datanodes[j]
2324                   + ". Will retry the block once.");
2325             }
2326             lastRetriedIndex = i;
2327             done = true; // actually it's not done; but we'll retry
2328             i--; // repeat at i-th block
2329             refetchBlocks = true;
2330             break;
2331           }
2332         } catch (IOException ie) {
2333           LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
2334         } finally {
2335           IOUtils.closeStream(in);
2336           IOUtils.closeStream(out);
2337         }
2338       }
2339 
2340       if (!done) {
2341         throw new IOException("Fail to get block MD5 for " + block);
2342       }
2343     }
2344 
2345     //compute file MD5
2346     final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
2347     switch (crcType) {
2348       case CRC32:
2349         return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
2350             crcPerBlock, fileMD5);
2351       case CRC32C:
2352         return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
2353             crcPerBlock, fileMD5);
2354       default:
2355         // If there is no block allocated for the file,
2356         // return one with the magic entry that matches what previous
2357         // hdfs versions return.
2358         if (locatedblocks.size() == 0) {
2359           return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
2360         }
2361 
2362         // we should never get here since the validity was checked
2363         // when getCrcType() was called above.
2364         return null;
2365     }
2366   }
2367 
2368   /**
2369    * Connect to the given datanode's datantrasfer port, and return
2370    * the resulting IOStreamPair. This includes encryption wrapping, etc.
2371    */
connectToDN(DatanodeInfo dn, int timeout, LocatedBlock lb)2372   private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
2373       LocatedBlock lb) throws IOException {
2374     boolean success = false;
2375     Socket sock = null;
2376     try {
2377       sock = socketFactory.createSocket();
2378       String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
2379       if (LOG.isDebugEnabled()) {
2380         LOG.debug("Connecting to datanode " + dnAddr);
2381       }
2382       NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
2383       sock.setSoTimeout(timeout);
2384 
2385       OutputStream unbufOut = NetUtils.getOutputStream(sock);
2386       InputStream unbufIn = NetUtils.getInputStream(sock);
2387       IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
2388         lb.getBlockToken(), dn);
2389       success = true;
2390       return ret;
2391     } finally {
2392       if (!success) {
2393         IOUtils.closeSocket(sock);
2394       }
2395     }
2396   }
2397 
2398   /**
2399    * Infer the checksum type for a replica by sending an OP_READ_BLOCK
2400    * for the first byte of that replica. This is used for compatibility
2401    * with older HDFS versions which did not include the checksum type in
2402    * OpBlockChecksumResponseProto.
2403    *
2404    * @param lb the located block
2405    * @param dn the connected datanode
2406    * @return the inferred checksum type
2407    * @throws IOException if an error occurs
2408    */
inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)2409   private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
2410       throws IOException {
2411     IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
2412 
2413     try {
2414       DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
2415           HdfsConstants.SMALL_BUFFER_SIZE));
2416       DataInputStream in = new DataInputStream(pair.in);
2417 
2418       new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
2419           0, 1, true, CachingStrategy.newDefaultStrategy());
2420       final BlockOpResponseProto reply =
2421           BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
2422       String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
2423       DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
2424 
2425       return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
2426     } finally {
2427       IOUtils.cleanup(null, pair.in, pair.out);
2428     }
2429   }
2430 
2431   /**
2432    * Set permissions to a file or directory.
2433    * @param src path name.
2434    * @param permission permission to set to
2435    *
2436    * @see ClientProtocol#setPermission(String, FsPermission)
2437    */
setPermission(String src, FsPermission permission)2438   public void setPermission(String src, FsPermission permission)
2439       throws IOException {
2440     checkOpen();
2441     TraceScope scope = getPathTraceScope("setPermission", src);
2442     try {
2443       namenode.setPermission(src, permission);
2444     } catch(RemoteException re) {
2445       throw re.unwrapRemoteException(AccessControlException.class,
2446                                      FileNotFoundException.class,
2447                                      SafeModeException.class,
2448                                      UnresolvedPathException.class,
2449                                      SnapshotAccessControlException.class);
2450     } finally {
2451       scope.close();
2452     }
2453   }
2454 
2455   /**
2456    * Set file or directory owner.
2457    * @param src path name.
2458    * @param username user id.
2459    * @param groupname user group.
2460    *
2461    * @see ClientProtocol#setOwner(String, String, String)
2462    */
setOwner(String src, String username, String groupname)2463   public void setOwner(String src, String username, String groupname)
2464       throws IOException {
2465     checkOpen();
2466     TraceScope scope = getPathTraceScope("setOwner", src);
2467     try {
2468       namenode.setOwner(src, username, groupname);
2469     } catch(RemoteException re) {
2470       throw re.unwrapRemoteException(AccessControlException.class,
2471                                      FileNotFoundException.class,
2472                                      SafeModeException.class,
2473                                      UnresolvedPathException.class,
2474                                      SnapshotAccessControlException.class);
2475     } finally {
2476       scope.close();
2477     }
2478   }
2479 
callGetStats()2480   private long[] callGetStats() throws IOException {
2481     checkOpen();
2482     TraceScope scope = Trace.startSpan("getStats", traceSampler);
2483     try {
2484       return namenode.getStats();
2485     } finally {
2486       scope.close();
2487     }
2488   }
2489 
2490   /**
2491    * @see ClientProtocol#getStats()
2492    */
getDiskStatus()2493   public FsStatus getDiskStatus() throws IOException {
2494     long rawNums[] = callGetStats();
2495     return new FsStatus(rawNums[0], rawNums[1], rawNums[2]);
2496   }
2497 
2498   /**
2499    * Returns count of blocks with no good replicas left. Normally should be
2500    * zero.
2501    * @throws IOException
2502    */
getMissingBlocksCount()2503   public long getMissingBlocksCount() throws IOException {
2504     return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
2505   }
2506 
2507   /**
2508    * Returns count of blocks with replication factor 1 and have
2509    * lost the only replica.
2510    * @throws IOException
2511    */
getMissingReplOneBlocksCount()2512   public long getMissingReplOneBlocksCount() throws IOException {
2513     return callGetStats()[ClientProtocol.
2514         GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX];
2515   }
2516 
2517   /**
2518    * Returns count of blocks with one of more replica missing.
2519    * @throws IOException
2520    */
getUnderReplicatedBlocksCount()2521   public long getUnderReplicatedBlocksCount() throws IOException {
2522     return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX];
2523   }
2524 
2525   /**
2526    * Returns count of blocks with at least one replica marked corrupt.
2527    * @throws IOException
2528    */
getCorruptBlocksCount()2529   public long getCorruptBlocksCount() throws IOException {
2530     return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX];
2531   }
2532 
2533   /**
2534    * @return a list in which each entry describes a corrupt file/block
2535    * @throws IOException
2536    */
listCorruptFileBlocks(String path, String cookie)2537   public CorruptFileBlocks listCorruptFileBlocks(String path,
2538                                                  String cookie)
2539         throws IOException {
2540     checkOpen();
2541     TraceScope scope = getPathTraceScope("listCorruptFileBlocks", path);
2542     try {
2543       return namenode.listCorruptFileBlocks(path, cookie);
2544     } finally {
2545       scope.close();
2546     }
2547   }
2548 
datanodeReport(DatanodeReportType type)2549   public DatanodeInfo[] datanodeReport(DatanodeReportType type)
2550       throws IOException {
2551     checkOpen();
2552     TraceScope scope = Trace.startSpan("datanodeReport", traceSampler);
2553     try {
2554       return namenode.getDatanodeReport(type);
2555     } finally {
2556       scope.close();
2557     }
2558   }
2559 
getDatanodeStorageReport( DatanodeReportType type)2560   public DatanodeStorageReport[] getDatanodeStorageReport(
2561       DatanodeReportType type) throws IOException {
2562     checkOpen();
2563     TraceScope scope =
2564         Trace.startSpan("datanodeStorageReport", traceSampler);
2565     try {
2566       return namenode.getDatanodeStorageReport(type);
2567     } finally {
2568       scope.close();
2569     }
2570   }
2571 
2572   /**
2573    * Enter, leave or get safe mode.
2574    *
2575    * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean)
2576    */
setSafeMode(SafeModeAction action)2577   public boolean setSafeMode(SafeModeAction action) throws IOException {
2578     return setSafeMode(action, false);
2579   }
2580 
2581   /**
2582    * Enter, leave or get safe mode.
2583    *
2584    * @param action
2585    *          One of SafeModeAction.GET, SafeModeAction.ENTER and
2586    *          SafeModeActiob.LEAVE
2587    * @param isChecked
2588    *          If true, then check only active namenode's safemode status, else
2589    *          check first namenode's status.
2590    * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
2591    */
setSafeMode(SafeModeAction action, boolean isChecked)2592   public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
2593     TraceScope scope =
2594         Trace.startSpan("setSafeMode", traceSampler);
2595     try {
2596       return namenode.setSafeMode(action, isChecked);
2597     } finally {
2598       scope.close();
2599     }
2600   }
2601 
2602   /**
2603    * Create one snapshot.
2604    *
2605    * @param snapshotRoot The directory where the snapshot is to be taken
2606    * @param snapshotName Name of the snapshot
2607    * @return the snapshot path.
2608    * @see ClientProtocol#createSnapshot(String, String)
2609    */
createSnapshot(String snapshotRoot, String snapshotName)2610   public String createSnapshot(String snapshotRoot, String snapshotName)
2611       throws IOException {
2612     checkOpen();
2613     TraceScope scope = Trace.startSpan("createSnapshot", traceSampler);
2614     try {
2615       return namenode.createSnapshot(snapshotRoot, snapshotName);
2616     } catch(RemoteException re) {
2617       throw re.unwrapRemoteException();
2618     } finally {
2619       scope.close();
2620     }
2621   }
2622 
2623   /**
2624    * Delete a snapshot of a snapshottable directory.
2625    *
2626    * @param snapshotRoot The snapshottable directory that the
2627    *                    to-be-deleted snapshot belongs to
2628    * @param snapshotName The name of the to-be-deleted snapshot
2629    * @throws IOException
2630    * @see ClientProtocol#deleteSnapshot(String, String)
2631    */
deleteSnapshot(String snapshotRoot, String snapshotName)2632   public void deleteSnapshot(String snapshotRoot, String snapshotName)
2633       throws IOException {
2634     checkOpen();
2635     TraceScope scope = Trace.startSpan("deleteSnapshot", traceSampler);
2636     try {
2637       namenode.deleteSnapshot(snapshotRoot, snapshotName);
2638     } catch(RemoteException re) {
2639       throw re.unwrapRemoteException();
2640     } finally {
2641       scope.close();
2642     }
2643   }
2644 
2645   /**
2646    * Rename a snapshot.
2647    * @param snapshotDir The directory path where the snapshot was taken
2648    * @param snapshotOldName Old name of the snapshot
2649    * @param snapshotNewName New name of the snapshot
2650    * @throws IOException
2651    * @see ClientProtocol#renameSnapshot(String, String, String)
2652    */
renameSnapshot(String snapshotDir, String snapshotOldName, String snapshotNewName)2653   public void renameSnapshot(String snapshotDir, String snapshotOldName,
2654       String snapshotNewName) throws IOException {
2655     checkOpen();
2656     TraceScope scope = Trace.startSpan("renameSnapshot", traceSampler);
2657     try {
2658       namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName);
2659     } catch(RemoteException re) {
2660       throw re.unwrapRemoteException();
2661     } finally {
2662       scope.close();
2663     }
2664   }
2665 
2666   /**
2667    * Get all the current snapshottable directories.
2668    * @return All the current snapshottable directories
2669    * @throws IOException
2670    * @see ClientProtocol#getSnapshottableDirListing()
2671    */
getSnapshottableDirListing()2672   public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
2673       throws IOException {
2674     checkOpen();
2675     TraceScope scope = Trace.startSpan("getSnapshottableDirListing",
2676         traceSampler);
2677     try {
2678       return namenode.getSnapshottableDirListing();
2679     } catch(RemoteException re) {
2680       throw re.unwrapRemoteException();
2681     } finally {
2682       scope.close();
2683     }
2684   }
2685 
2686   /**
2687    * Allow snapshot on a directory.
2688    *
2689    * @see ClientProtocol#allowSnapshot(String snapshotRoot)
2690    */
allowSnapshot(String snapshotRoot)2691   public void allowSnapshot(String snapshotRoot) throws IOException {
2692     checkOpen();
2693     TraceScope scope = Trace.startSpan("allowSnapshot", traceSampler);
2694     try {
2695       namenode.allowSnapshot(snapshotRoot);
2696     } catch (RemoteException re) {
2697       throw re.unwrapRemoteException();
2698     } finally {
2699       scope.close();
2700     }
2701   }
2702 
2703   /**
2704    * Disallow snapshot on a directory.
2705    *
2706    * @see ClientProtocol#disallowSnapshot(String snapshotRoot)
2707    */
disallowSnapshot(String snapshotRoot)2708   public void disallowSnapshot(String snapshotRoot) throws IOException {
2709     checkOpen();
2710     TraceScope scope = Trace.startSpan("disallowSnapshot", traceSampler);
2711     try {
2712       namenode.disallowSnapshot(snapshotRoot);
2713     } catch (RemoteException re) {
2714       throw re.unwrapRemoteException();
2715     } finally {
2716       scope.close();
2717     }
2718   }
2719 
2720   /**
2721    * Get the difference between two snapshots, or between a snapshot and the
2722    * current tree of a directory.
2723    * @see ClientProtocol#getSnapshotDiffReport(String, String, String)
2724    */
getSnapshotDiffReport(String snapshotDir, String fromSnapshot, String toSnapshot)2725   public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir,
2726       String fromSnapshot, String toSnapshot) throws IOException {
2727     checkOpen();
2728     TraceScope scope = Trace.startSpan("getSnapshotDiffReport", traceSampler);
2729     try {
2730       return namenode.getSnapshotDiffReport(snapshotDir,
2731           fromSnapshot, toSnapshot);
2732     } catch(RemoteException re) {
2733       throw re.unwrapRemoteException();
2734     } finally {
2735       scope.close();
2736     }
2737   }
2738 
addCacheDirective( CacheDirectiveInfo info, EnumSet<CacheFlag> flags)2739   public long addCacheDirective(
2740       CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
2741     checkOpen();
2742     TraceScope scope = Trace.startSpan("addCacheDirective", traceSampler);
2743     try {
2744       return namenode.addCacheDirective(info, flags);
2745     } catch (RemoteException re) {
2746       throw re.unwrapRemoteException();
2747     } finally {
2748       scope.close();
2749     }
2750   }
2751 
modifyCacheDirective( CacheDirectiveInfo info, EnumSet<CacheFlag> flags)2752   public void modifyCacheDirective(
2753       CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
2754     checkOpen();
2755     TraceScope scope = Trace.startSpan("modifyCacheDirective", traceSampler);
2756     try {
2757       namenode.modifyCacheDirective(info, flags);
2758     } catch (RemoteException re) {
2759       throw re.unwrapRemoteException();
2760     } finally {
2761       scope.close();
2762     }
2763   }
2764 
removeCacheDirective(long id)2765   public void removeCacheDirective(long id)
2766       throws IOException {
2767     checkOpen();
2768     TraceScope scope = Trace.startSpan("removeCacheDirective", traceSampler);
2769     try {
2770       namenode.removeCacheDirective(id);
2771     } catch (RemoteException re) {
2772       throw re.unwrapRemoteException();
2773     } finally {
2774       scope.close();
2775     }
2776   }
2777 
listCacheDirectives( CacheDirectiveInfo filter)2778   public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
2779       CacheDirectiveInfo filter) throws IOException {
2780     return new CacheDirectiveIterator(namenode, filter, traceSampler);
2781   }
2782 
addCachePool(CachePoolInfo info)2783   public void addCachePool(CachePoolInfo info) throws IOException {
2784     checkOpen();
2785     TraceScope scope = Trace.startSpan("addCachePool", traceSampler);
2786     try {
2787       namenode.addCachePool(info);
2788     } catch (RemoteException re) {
2789       throw re.unwrapRemoteException();
2790     } finally {
2791       scope.close();
2792     }
2793   }
2794 
modifyCachePool(CachePoolInfo info)2795   public void modifyCachePool(CachePoolInfo info) throws IOException {
2796     checkOpen();
2797     TraceScope scope = Trace.startSpan("modifyCachePool", traceSampler);
2798     try {
2799       namenode.modifyCachePool(info);
2800     } catch (RemoteException re) {
2801       throw re.unwrapRemoteException();
2802     } finally {
2803       scope.close();
2804     }
2805   }
2806 
removeCachePool(String poolName)2807   public void removeCachePool(String poolName) throws IOException {
2808     checkOpen();
2809     TraceScope scope = Trace.startSpan("removeCachePool", traceSampler);
2810     try {
2811       namenode.removeCachePool(poolName);
2812     } catch (RemoteException re) {
2813       throw re.unwrapRemoteException();
2814     } finally {
2815       scope.close();
2816     }
2817   }
2818 
listCachePools()2819   public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
2820     return new CachePoolIterator(namenode, traceSampler);
2821   }
2822 
2823   /**
2824    * Save namespace image.
2825    *
2826    * @see ClientProtocol#saveNamespace()
2827    */
saveNamespace()2828   void saveNamespace() throws AccessControlException, IOException {
2829     TraceScope scope = Trace.startSpan("saveNamespace", traceSampler);
2830     try {
2831       namenode.saveNamespace();
2832     } catch(RemoteException re) {
2833       throw re.unwrapRemoteException(AccessControlException.class);
2834     } finally {
2835       scope.close();
2836     }
2837   }
2838 
2839   /**
2840    * Rolls the edit log on the active NameNode.
2841    * @return the txid of the new log segment
2842    *
2843    * @see ClientProtocol#rollEdits()
2844    */
rollEdits()2845   long rollEdits() throws AccessControlException, IOException {
2846     TraceScope scope = Trace.startSpan("rollEdits", traceSampler);
2847     try {
2848       return namenode.rollEdits();
2849     } catch(RemoteException re) {
2850       throw re.unwrapRemoteException(AccessControlException.class);
2851     } finally {
2852       scope.close();
2853     }
2854   }
2855 
2856   @VisibleForTesting
getPreviousBlock(long fileId)2857   ExtendedBlock getPreviousBlock(long fileId) {
2858     return filesBeingWritten.get(fileId).getBlock();
2859   }
2860 
2861   /**
2862    * enable/disable restore failed storage.
2863    *
2864    * @see ClientProtocol#restoreFailedStorage(String arg)
2865    */
restoreFailedStorage(String arg)2866   boolean restoreFailedStorage(String arg)
2867       throws AccessControlException, IOException{
2868     TraceScope scope = Trace.startSpan("restoreFailedStorage", traceSampler);
2869     try {
2870       return namenode.restoreFailedStorage(arg);
2871     } finally {
2872       scope.close();
2873     }
2874   }
2875 
2876   /**
2877    * Refresh the hosts and exclude files.  (Rereads them.)
2878    * See {@link ClientProtocol#refreshNodes()}
2879    * for more details.
2880    *
2881    * @see ClientProtocol#refreshNodes()
2882    */
refreshNodes()2883   public void refreshNodes() throws IOException {
2884     TraceScope scope = Trace.startSpan("refreshNodes", traceSampler);
2885     try {
2886       namenode.refreshNodes();
2887     } finally {
2888       scope.close();
2889     }
2890   }
2891 
2892   /**
2893    * Dumps DFS data structures into specified file.
2894    *
2895    * @see ClientProtocol#metaSave(String)
2896    */
metaSave(String pathname)2897   public void metaSave(String pathname) throws IOException {
2898     TraceScope scope = Trace.startSpan("metaSave", traceSampler);
2899     try {
2900       namenode.metaSave(pathname);
2901     } finally {
2902       scope.close();
2903     }
2904   }
2905 
2906   /**
2907    * Requests the namenode to tell all datanodes to use a new, non-persistent
2908    * bandwidth value for dfs.balance.bandwidthPerSec.
2909    * See {@link ClientProtocol#setBalancerBandwidth(long)}
2910    * for more details.
2911    *
2912    * @see ClientProtocol#setBalancerBandwidth(long)
2913    */
setBalancerBandwidth(long bandwidth)2914   public void setBalancerBandwidth(long bandwidth) throws IOException {
2915     TraceScope scope = Trace.startSpan("setBalancerBandwidth", traceSampler);
2916     try {
2917       namenode.setBalancerBandwidth(bandwidth);
2918     } finally {
2919       scope.close();
2920     }
2921   }
2922 
2923   /**
2924    * @see ClientProtocol#finalizeUpgrade()
2925    */
finalizeUpgrade()2926   public void finalizeUpgrade() throws IOException {
2927     TraceScope scope = Trace.startSpan("finalizeUpgrade", traceSampler);
2928     try {
2929       namenode.finalizeUpgrade();
2930     } finally {
2931       scope.close();
2932     }
2933   }
2934 
rollingUpgrade(RollingUpgradeAction action)2935   RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException {
2936     TraceScope scope = Trace.startSpan("rollingUpgrade", traceSampler);
2937     try {
2938       return namenode.rollingUpgrade(action);
2939     } finally {
2940       scope.close();
2941     }
2942   }
2943 
2944   /**
2945    */
2946   @Deprecated
mkdirs(String src)2947   public boolean mkdirs(String src) throws IOException {
2948     return mkdirs(src, null, true);
2949   }
2950 
2951   /**
2952    * Create a directory (or hierarchy of directories) with the given
2953    * name and permission.
2954    *
2955    * @param src The path of the directory being created
2956    * @param permission The permission of the directory being created.
2957    * If permission == null, use {@link FsPermission#getDefault()}.
2958    * @param createParent create missing parent directory if true
2959    *
2960    * @return True if the operation success.
2961    *
2962    * @see ClientProtocol#mkdirs(String, FsPermission, boolean)
2963    */
mkdirs(String src, FsPermission permission, boolean createParent)2964   public boolean mkdirs(String src, FsPermission permission,
2965       boolean createParent) throws IOException {
2966     if (permission == null) {
2967       permission = FsPermission.getDefault();
2968     }
2969     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
2970     return primitiveMkdir(src, masked, createParent);
2971   }
2972 
2973   /**
2974    * Same {{@link #mkdirs(String, FsPermission, boolean)} except
2975    * that the permissions has already been masked against umask.
2976    */
primitiveMkdir(String src, FsPermission absPermission)2977   public boolean primitiveMkdir(String src, FsPermission absPermission)
2978     throws IOException {
2979     return primitiveMkdir(src, absPermission, true);
2980   }
2981 
2982   /**
2983    * Same {{@link #mkdirs(String, FsPermission, boolean)} except
2984    * that the permissions has already been masked against umask.
2985    */
primitiveMkdir(String src, FsPermission absPermission, boolean createParent)2986   public boolean primitiveMkdir(String src, FsPermission absPermission,
2987     boolean createParent)
2988     throws IOException {
2989     checkOpen();
2990     if (absPermission == null) {
2991       absPermission =
2992         FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
2993     }
2994 
2995     if(LOG.isDebugEnabled()) {
2996       LOG.debug(src + ": masked=" + absPermission);
2997     }
2998     TraceScope scope = Trace.startSpan("mkdir", traceSampler);
2999     try {
3000       return namenode.mkdirs(src, absPermission, createParent);
3001     } catch(RemoteException re) {
3002       throw re.unwrapRemoteException(AccessControlException.class,
3003                                      InvalidPathException.class,
3004                                      FileAlreadyExistsException.class,
3005                                      FileNotFoundException.class,
3006                                      ParentNotDirectoryException.class,
3007                                      SafeModeException.class,
3008                                      NSQuotaExceededException.class,
3009                                      DSQuotaExceededException.class,
3010                                      UnresolvedPathException.class,
3011                                      SnapshotAccessControlException.class);
3012     } finally {
3013       scope.close();
3014     }
3015   }
3016 
3017   /**
3018    * Get {@link ContentSummary} rooted at the specified directory.
3019    * @param src The string representation of the path
3020    *
3021    * @see ClientProtocol#getContentSummary(String)
3022    */
getContentSummary(String src)3023   ContentSummary getContentSummary(String src) throws IOException {
3024     TraceScope scope = getPathTraceScope("getContentSummary", src);
3025     try {
3026       return namenode.getContentSummary(src);
3027     } catch(RemoteException re) {
3028       throw re.unwrapRemoteException(AccessControlException.class,
3029                                      FileNotFoundException.class,
3030                                      UnresolvedPathException.class);
3031     } finally {
3032       scope.close();
3033     }
3034   }
3035 
3036   /**
3037    * Sets or resets quotas for a directory.
3038    * @see ClientProtocol#setQuota(String, long, long, StorageType)
3039    */
setQuota(String src, long namespaceQuota, long storagespaceQuota)3040   void setQuota(String src, long namespaceQuota, long storagespaceQuota)
3041       throws IOException {
3042     // sanity check
3043     if ((namespaceQuota <= 0 && namespaceQuota != HdfsConstants.QUOTA_DONT_SET &&
3044          namespaceQuota != HdfsConstants.QUOTA_RESET) ||
3045         (storagespaceQuota <= 0 && storagespaceQuota != HdfsConstants.QUOTA_DONT_SET &&
3046          storagespaceQuota != HdfsConstants.QUOTA_RESET)) {
3047       throw new IllegalArgumentException("Invalid values for quota : " +
3048                                          namespaceQuota + " and " +
3049                                          storagespaceQuota);
3050 
3051     }
3052     TraceScope scope = getPathTraceScope("setQuota", src);
3053     try {
3054       // Pass null as storage type for traditional namespace/storagespace quota.
3055       namenode.setQuota(src, namespaceQuota, storagespaceQuota, null);
3056     } catch(RemoteException re) {
3057       throw re.unwrapRemoteException(AccessControlException.class,
3058                                      FileNotFoundException.class,
3059                                      NSQuotaExceededException.class,
3060                                      DSQuotaExceededException.class,
3061                                      UnresolvedPathException.class,
3062                                      SnapshotAccessControlException.class);
3063     } finally {
3064       scope.close();
3065     }
3066   }
3067 
3068   /**
3069    * Sets or resets quotas by storage type for a directory.
3070    * @see ClientProtocol#setQuota(String, long, long, StorageType)
3071    */
setQuotaByStorageType(String src, StorageType type, long quota)3072   void setQuotaByStorageType(String src, StorageType type, long quota)
3073       throws IOException {
3074     if (quota <= 0 && quota != HdfsConstants.QUOTA_DONT_SET &&
3075         quota != HdfsConstants.QUOTA_RESET) {
3076       throw new IllegalArgumentException("Invalid values for quota :" +
3077         quota);
3078     }
3079     if (type == null) {
3080       throw new IllegalArgumentException("Invalid storage type(null)");
3081     }
3082     if (!type.supportTypeQuota()) {
3083       throw new IllegalArgumentException("Don't support Quota for storage type : "
3084         + type.toString());
3085     }
3086     TraceScope scope = getPathTraceScope("setQuotaByStorageType", src);
3087     try {
3088       namenode.setQuota(src, HdfsConstants.QUOTA_DONT_SET, quota, type);
3089     } catch (RemoteException re) {
3090       throw re.unwrapRemoteException(AccessControlException.class,
3091         FileNotFoundException.class,
3092         QuotaByStorageTypeExceededException.class,
3093         UnresolvedPathException.class,
3094         SnapshotAccessControlException.class);
3095     } finally {
3096       scope.close();
3097     }
3098   }
3099   /**
3100    * set the modification and access time of a file
3101    *
3102    * @see ClientProtocol#setTimes(String, long, long)
3103    */
setTimes(String src, long mtime, long atime)3104   public void setTimes(String src, long mtime, long atime) throws IOException {
3105     checkOpen();
3106     TraceScope scope = getPathTraceScope("setTimes", src);
3107     try {
3108       namenode.setTimes(src, mtime, atime);
3109     } catch(RemoteException re) {
3110       throw re.unwrapRemoteException(AccessControlException.class,
3111                                      FileNotFoundException.class,
3112                                      UnresolvedPathException.class,
3113                                      SnapshotAccessControlException.class);
3114     } finally {
3115       scope.close();
3116     }
3117   }
3118 
3119   /**
3120    * @deprecated use {@link HdfsDataInputStream} instead.
3121    */
3122   @Deprecated
3123   public static class DFSDataInputStream extends HdfsDataInputStream {
3124 
DFSDataInputStream(DFSInputStream in)3125     public DFSDataInputStream(DFSInputStream in) throws IOException {
3126       super(in);
3127     }
3128   }
3129 
reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn)3130   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
3131     DatanodeInfo [] dnArr = { dn };
3132     LocatedBlock [] lblocks = { new LocatedBlock(blk, dnArr) };
3133     reportChecksumFailure(file, lblocks);
3134   }
3135 
3136   // just reports checksum failure and ignores any exception during the report.
reportChecksumFailure(String file, LocatedBlock lblocks[])3137   void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
3138     try {
3139       reportBadBlocks(lblocks);
3140     } catch (IOException ie) {
3141       LOG.info("Found corruption while reading " + file
3142           + ". Error repairing corrupt blocks. Bad blocks remain.", ie);
3143     }
3144   }
3145 
3146   @Override
toString()3147   public String toString() {
3148     return getClass().getSimpleName() + "[clientName=" + clientName
3149         + ", ugi=" + ugi + "]";
3150   }
3151 
getDefaultReadCachingStrategy()3152   public CachingStrategy getDefaultReadCachingStrategy() {
3153     return defaultReadCachingStrategy;
3154   }
3155 
getDefaultWriteCachingStrategy()3156   public CachingStrategy getDefaultWriteCachingStrategy() {
3157     return defaultWriteCachingStrategy;
3158   }
3159 
getClientContext()3160   public ClientContext getClientContext() {
3161     return clientContext;
3162   }
3163 
modifyAclEntries(String src, List<AclEntry> aclSpec)3164   public void modifyAclEntries(String src, List<AclEntry> aclSpec)
3165       throws IOException {
3166     checkOpen();
3167     TraceScope scope = getPathTraceScope("modifyAclEntries", src);
3168     try {
3169       namenode.modifyAclEntries(src, aclSpec);
3170     } catch(RemoteException re) {
3171       throw re.unwrapRemoteException(AccessControlException.class,
3172                                      AclException.class,
3173                                      FileNotFoundException.class,
3174                                      NSQuotaExceededException.class,
3175                                      SafeModeException.class,
3176                                      SnapshotAccessControlException.class,
3177                                      UnresolvedPathException.class);
3178     } finally {
3179       scope.close();
3180     }
3181   }
3182 
removeAclEntries(String src, List<AclEntry> aclSpec)3183   public void removeAclEntries(String src, List<AclEntry> aclSpec)
3184       throws IOException {
3185     checkOpen();
3186     TraceScope scope = Trace.startSpan("removeAclEntries", traceSampler);
3187     try {
3188       namenode.removeAclEntries(src, aclSpec);
3189     } catch(RemoteException re) {
3190       throw re.unwrapRemoteException(AccessControlException.class,
3191                                      AclException.class,
3192                                      FileNotFoundException.class,
3193                                      NSQuotaExceededException.class,
3194                                      SafeModeException.class,
3195                                      SnapshotAccessControlException.class,
3196                                      UnresolvedPathException.class);
3197     } finally {
3198       scope.close();
3199     }
3200   }
3201 
removeDefaultAcl(String src)3202   public void removeDefaultAcl(String src) throws IOException {
3203     checkOpen();
3204     TraceScope scope = Trace.startSpan("removeDefaultAcl", traceSampler);
3205     try {
3206       namenode.removeDefaultAcl(src);
3207     } catch(RemoteException re) {
3208       throw re.unwrapRemoteException(AccessControlException.class,
3209                                      AclException.class,
3210                                      FileNotFoundException.class,
3211                                      NSQuotaExceededException.class,
3212                                      SafeModeException.class,
3213                                      SnapshotAccessControlException.class,
3214                                      UnresolvedPathException.class);
3215     } finally {
3216       scope.close();
3217     }
3218   }
3219 
removeAcl(String src)3220   public void removeAcl(String src) throws IOException {
3221     checkOpen();
3222     TraceScope scope = Trace.startSpan("removeAcl", traceSampler);
3223     try {
3224       namenode.removeAcl(src);
3225     } catch(RemoteException re) {
3226       throw re.unwrapRemoteException(AccessControlException.class,
3227                                      AclException.class,
3228                                      FileNotFoundException.class,
3229                                      NSQuotaExceededException.class,
3230                                      SafeModeException.class,
3231                                      SnapshotAccessControlException.class,
3232                                      UnresolvedPathException.class);
3233     } finally {
3234       scope.close();
3235     }
3236   }
3237 
setAcl(String src, List<AclEntry> aclSpec)3238   public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
3239     checkOpen();
3240     TraceScope scope = Trace.startSpan("setAcl", traceSampler);
3241     try {
3242       namenode.setAcl(src, aclSpec);
3243     } catch(RemoteException re) {
3244       throw re.unwrapRemoteException(AccessControlException.class,
3245                                      AclException.class,
3246                                      FileNotFoundException.class,
3247                                      NSQuotaExceededException.class,
3248                                      SafeModeException.class,
3249                                      SnapshotAccessControlException.class,
3250                                      UnresolvedPathException.class);
3251     } finally {
3252       scope.close();
3253     }
3254   }
3255 
getAclStatus(String src)3256   public AclStatus getAclStatus(String src) throws IOException {
3257     checkOpen();
3258     TraceScope scope = getPathTraceScope("getAclStatus", src);
3259     try {
3260       return namenode.getAclStatus(src);
3261     } catch(RemoteException re) {
3262       throw re.unwrapRemoteException(AccessControlException.class,
3263                                      AclException.class,
3264                                      FileNotFoundException.class,
3265                                      UnresolvedPathException.class);
3266     } finally {
3267       scope.close();
3268     }
3269   }
3270 
createEncryptionZone(String src, String keyName)3271   public void createEncryptionZone(String src, String keyName)
3272     throws IOException {
3273     checkOpen();
3274     TraceScope scope = getPathTraceScope("createEncryptionZone", src);
3275     try {
3276       namenode.createEncryptionZone(src, keyName);
3277     } catch (RemoteException re) {
3278       throw re.unwrapRemoteException(AccessControlException.class,
3279                                      SafeModeException.class,
3280                                      UnresolvedPathException.class);
3281     } finally {
3282       scope.close();
3283     }
3284   }
3285 
getEZForPath(String src)3286   public EncryptionZone getEZForPath(String src)
3287           throws IOException {
3288     checkOpen();
3289     TraceScope scope = getPathTraceScope("getEZForPath", src);
3290     try {
3291       return namenode.getEZForPath(src);
3292     } catch (RemoteException re) {
3293       throw re.unwrapRemoteException(AccessControlException.class,
3294                                      UnresolvedPathException.class);
3295     } finally {
3296       scope.close();
3297     }
3298   }
3299 
listEncryptionZones()3300   public RemoteIterator<EncryptionZone> listEncryptionZones()
3301       throws IOException {
3302     checkOpen();
3303     return new EncryptionZoneIterator(namenode, traceSampler);
3304   }
3305 
setXAttr(String src, String name, byte[] value, EnumSet<XAttrSetFlag> flag)3306   public void setXAttr(String src, String name, byte[] value,
3307       EnumSet<XAttrSetFlag> flag) throws IOException {
3308     checkOpen();
3309     TraceScope scope = getPathTraceScope("setXAttr", src);
3310     try {
3311       namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag);
3312     } catch (RemoteException re) {
3313       throw re.unwrapRemoteException(AccessControlException.class,
3314                                      FileNotFoundException.class,
3315                                      NSQuotaExceededException.class,
3316                                      SafeModeException.class,
3317                                      SnapshotAccessControlException.class,
3318                                      UnresolvedPathException.class);
3319     } finally {
3320       scope.close();
3321     }
3322   }
3323 
getXAttr(String src, String name)3324   public byte[] getXAttr(String src, String name) throws IOException {
3325     checkOpen();
3326     TraceScope scope = getPathTraceScope("getXAttr", src);
3327     try {
3328       final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name);
3329       final List<XAttr> result = namenode.getXAttrs(src, xAttrs);
3330       return XAttrHelper.getFirstXAttrValue(result);
3331     } catch(RemoteException re) {
3332       throw re.unwrapRemoteException(AccessControlException.class,
3333                                      FileNotFoundException.class,
3334                                      UnresolvedPathException.class);
3335     } finally {
3336       scope.close();
3337     }
3338   }
3339 
getXAttrs(String src)3340   public Map<String, byte[]> getXAttrs(String src) throws IOException {
3341     checkOpen();
3342     TraceScope scope = getPathTraceScope("getXAttrs", src);
3343     try {
3344       return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null));
3345     } catch(RemoteException re) {
3346       throw re.unwrapRemoteException(AccessControlException.class,
3347                                      FileNotFoundException.class,
3348                                      UnresolvedPathException.class);
3349     } finally {
3350       scope.close();
3351     }
3352   }
3353 
getXAttrs(String src, List<String> names)3354   public Map<String, byte[]> getXAttrs(String src, List<String> names)
3355       throws IOException {
3356     checkOpen();
3357     TraceScope scope = getPathTraceScope("getXAttrs", src);
3358     try {
3359       return XAttrHelper.buildXAttrMap(namenode.getXAttrs(
3360           src, XAttrHelper.buildXAttrs(names)));
3361     } catch(RemoteException re) {
3362       throw re.unwrapRemoteException(AccessControlException.class,
3363                                      FileNotFoundException.class,
3364                                      UnresolvedPathException.class);
3365     } finally {
3366       scope.close();
3367     }
3368   }
3369 
listXAttrs(String src)3370   public List<String> listXAttrs(String src)
3371           throws IOException {
3372     checkOpen();
3373     TraceScope scope = getPathTraceScope("listXAttrs", src);
3374     try {
3375       final Map<String, byte[]> xattrs =
3376         XAttrHelper.buildXAttrMap(namenode.listXAttrs(src));
3377       return Lists.newArrayList(xattrs.keySet());
3378     } catch(RemoteException re) {
3379       throw re.unwrapRemoteException(AccessControlException.class,
3380                                      FileNotFoundException.class,
3381                                      UnresolvedPathException.class);
3382     } finally {
3383       scope.close();
3384     }
3385   }
3386 
removeXAttr(String src, String name)3387   public void removeXAttr(String src, String name) throws IOException {
3388     checkOpen();
3389     TraceScope scope = getPathTraceScope("removeXAttr", src);
3390     try {
3391       namenode.removeXAttr(src, XAttrHelper.buildXAttr(name));
3392     } catch(RemoteException re) {
3393       throw re.unwrapRemoteException(AccessControlException.class,
3394                                      FileNotFoundException.class,
3395                                      NSQuotaExceededException.class,
3396                                      SafeModeException.class,
3397                                      SnapshotAccessControlException.class,
3398                                      UnresolvedPathException.class);
3399     } finally {
3400       scope.close();
3401     }
3402   }
3403 
checkAccess(String src, FsAction mode)3404   public void checkAccess(String src, FsAction mode) throws IOException {
3405     checkOpen();
3406     TraceScope scope = getPathTraceScope("checkAccess", src);
3407     try {
3408       namenode.checkAccess(src, mode);
3409     } catch (RemoteException re) {
3410       throw re.unwrapRemoteException(AccessControlException.class,
3411           FileNotFoundException.class,
3412           UnresolvedPathException.class);
3413     } finally {
3414       scope.close();
3415     }
3416   }
3417 
getInotifyEventStream()3418   public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
3419     return new DFSInotifyEventInputStream(traceSampler, namenode);
3420   }
3421 
getInotifyEventStream(long lastReadTxid)3422   public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
3423       throws IOException {
3424     return new DFSInotifyEventInputStream(traceSampler, namenode, lastReadTxid);
3425   }
3426 
3427   @Override // RemotePeerFactory
newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)3428   public Peer newConnectedPeer(InetSocketAddress addr,
3429       Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
3430       throws IOException {
3431     Peer peer = null;
3432     boolean success = false;
3433     Socket sock = null;
3434     try {
3435       sock = socketFactory.createSocket();
3436       NetUtils.connect(sock, addr,
3437         getRandomLocalInterfaceAddr(),
3438         dfsClientConf.socketTimeout);
3439       peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
3440           blockToken, datanodeId);
3441       peer.setReadTimeout(dfsClientConf.socketTimeout);
3442       success = true;
3443       return peer;
3444     } finally {
3445       if (!success) {
3446         IOUtils.cleanup(LOG, peer);
3447         IOUtils.closeSocket(sock);
3448       }
3449     }
3450   }
3451 
3452   /**
3453    * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
3454    * it does not already exist.
3455    * @param num Number of threads for hedged reads thread pool.
3456    * If zero, skip hedged reads thread pool creation.
3457    */
initThreadsNumForHedgedReads(int num)3458   private synchronized void initThreadsNumForHedgedReads(int num) {
3459     if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return;
3460     HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
3461         TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
3462         new Daemon.DaemonFactory() {
3463           private final AtomicInteger threadIndex =
3464             new AtomicInteger(0);
3465           @Override
3466           public Thread newThread(Runnable r) {
3467             Thread t = super.newThread(r);
3468             t.setName("hedgedRead-" +
3469               threadIndex.getAndIncrement());
3470             return t;
3471           }
3472         },
3473         new ThreadPoolExecutor.CallerRunsPolicy() {
3474 
3475       @Override
3476       public void rejectedExecution(Runnable runnable,
3477           ThreadPoolExecutor e) {
3478         LOG.info("Execution rejected, Executing in current thread");
3479         HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
3480         // will run in the current thread
3481         super.rejectedExecution(runnable, e);
3482       }
3483     });
3484     HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
3485     if (LOG.isDebugEnabled()) {
3486       LOG.debug("Using hedged reads; pool threads=" + num);
3487     }
3488   }
3489 
getHedgedReadTimeout()3490   long getHedgedReadTimeout() {
3491     return this.hedgedReadThresholdMillis;
3492   }
3493 
3494   @VisibleForTesting
setHedgedReadTimeout(long timeoutMillis)3495   void setHedgedReadTimeout(long timeoutMillis) {
3496     this.hedgedReadThresholdMillis = timeoutMillis;
3497   }
3498 
getHedgedReadsThreadPool()3499   ThreadPoolExecutor getHedgedReadsThreadPool() {
3500     return HEDGED_READ_THREAD_POOL;
3501   }
3502 
isHedgedReadsEnabled()3503   boolean isHedgedReadsEnabled() {
3504     return (HEDGED_READ_THREAD_POOL != null) &&
3505       HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
3506   }
3507 
getHedgedReadMetrics()3508   DFSHedgedReadMetrics getHedgedReadMetrics() {
3509     return HEDGED_READ_METRIC;
3510   }
3511 
getKeyProvider()3512   public KeyProvider getKeyProvider() {
3513     return clientContext.getKeyProviderCache().get(conf);
3514   }
3515 
3516   @VisibleForTesting
setKeyProvider(KeyProvider provider)3517   public void setKeyProvider(KeyProvider provider) {
3518     try {
3519       clientContext.getKeyProviderCache().setKeyProvider(conf, provider);
3520     } catch (IOException e) {
3521      LOG.error("Could not set KeyProvider !!", e);
3522     }
3523   }
3524 
3525   /**
3526    * Probe for encryption enabled on this filesystem.
3527    * See {@link DFSUtil#isHDFSEncryptionEnabled(Configuration)}
3528    * @return true if encryption is enabled
3529    */
isHDFSEncryptionEnabled()3530   public boolean isHDFSEncryptionEnabled() {
3531     return DFSUtil.isHDFSEncryptionEnabled(this.conf);
3532   }
3533 
3534   /**
3535    * Returns the SaslDataTransferClient configured for this DFSClient.
3536    *
3537    * @return SaslDataTransferClient configured for this DFSClient
3538    */
getSaslDataTransferClient()3539   public SaslDataTransferClient getSaslDataTransferClient() {
3540     return saslClient;
3541   }
3542 
3543   private static final byte[] PATH = "path".getBytes(Charset.forName("UTF-8"));
3544 
getPathTraceScope(String description, String path)3545   TraceScope getPathTraceScope(String description, String path) {
3546     TraceScope scope = Trace.startSpan(description, traceSampler);
3547     Span span = scope.getSpan();
3548     if (span != null) {
3549       if (path != null) {
3550         span.addKVAnnotation(PATH,
3551             path.getBytes(Charset.forName("UTF-8")));
3552       }
3553     }
3554     return scope;
3555   }
3556 
3557   private static final byte[] SRC = "src".getBytes(Charset.forName("UTF-8"));
3558 
3559   private static final byte[] DST = "dst".getBytes(Charset.forName("UTF-8"));
3560 
getSrcDstTraceScope(String description, String src, String dst)3561   TraceScope getSrcDstTraceScope(String description, String src, String dst) {
3562     TraceScope scope = Trace.startSpan(description, traceSampler);
3563     Span span = scope.getSpan();
3564     if (span != null) {
3565       if (src != null) {
3566         span.addKVAnnotation(SRC,
3567             src.getBytes(Charset.forName("UTF-8")));
3568       }
3569       if (dst != null) {
3570         span.addKVAnnotation(DST,
3571             dst.getBytes(Charset.forName("UTF-8")));
3572       }
3573     }
3574     return scope;
3575   }
3576 }
3577