1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; 21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; 22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; 23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; 24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; 25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT; 26 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; 27 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT; 28 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; 29 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; 30 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; 31 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; 32 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; 33 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; 34 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; 35 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; 36 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; 37 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; 38 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; 39 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; 40 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; 41 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; 42 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; 43 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; 44 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; 45 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; 46 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY; 47 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT; 48 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY; 49 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE; 50 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; 51 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; 52 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; 53 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; 54 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 55 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 56 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 57 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; 58 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; 59 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; 60 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; 61 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; 62 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; 63 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; 64 65 import java.io.BufferedOutputStream; 66 import java.io.DataInputStream; 67 import java.io.DataOutputStream; 68 import java.io.FileNotFoundException; 69 import java.io.IOException; 70 import java.io.InputStream; 71 import java.io.OutputStream; 72 import java.lang.reflect.Proxy; 73 import java.net.InetAddress; 74 import java.net.InetSocketAddress; 75 import java.net.Socket; 76 import java.net.SocketAddress; 77 import java.net.URI; 78 import java.net.UnknownHostException; 79 import java.nio.charset.Charset; 80 import java.security.GeneralSecurityException; 81 import java.util.ArrayList; 82 import java.util.Collections; 83 import java.util.EnumSet; 84 import java.util.HashMap; 85 import java.util.LinkedHashMap; 86 import java.util.List; 87 import java.util.Map; 88 import java.util.Random; 89 import java.util.concurrent.SynchronousQueue; 90 import java.util.concurrent.ThreadPoolExecutor; 91 import java.util.concurrent.TimeUnit; 92 import java.util.concurrent.atomic.AtomicBoolean; 93 import java.util.concurrent.atomic.AtomicInteger; 94 95 import javax.net.SocketFactory; 96 97 import org.apache.commons.logging.Log; 98 import org.apache.commons.logging.LogFactory; 99 import org.apache.hadoop.HadoopIllegalArgumentException; 100 import org.apache.hadoop.classification.InterfaceAudience; 101 import org.apache.hadoop.conf.Configuration; 102 import org.apache.hadoop.crypto.CipherSuite; 103 import org.apache.hadoop.crypto.CryptoCodec; 104 import org.apache.hadoop.crypto.CryptoInputStream; 105 import org.apache.hadoop.crypto.CryptoOutputStream; 106 import org.apache.hadoop.crypto.CryptoProtocolVersion; 107 import org.apache.hadoop.crypto.key.KeyProvider; 108 import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 109 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; 110 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; 111 import org.apache.hadoop.fs.BlockLocation; 112 import org.apache.hadoop.fs.BlockStorageLocation; 113 import org.apache.hadoop.fs.CacheFlag; 114 import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 115 import org.apache.hadoop.fs.ContentSummary; 116 import org.apache.hadoop.fs.CreateFlag; 117 import org.apache.hadoop.fs.FileAlreadyExistsException; 118 import org.apache.hadoop.fs.FileEncryptionInfo; 119 import org.apache.hadoop.fs.FileSystem; 120 import org.apache.hadoop.fs.FsServerDefaults; 121 import org.apache.hadoop.fs.FsStatus; 122 import org.apache.hadoop.fs.HdfsBlockLocation; 123 import org.apache.hadoop.fs.InvalidPathException; 124 import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; 125 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; 126 import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; 127 import org.apache.hadoop.fs.Options; 128 import org.apache.hadoop.fs.Options.ChecksumOpt; 129 import org.apache.hadoop.fs.ParentNotDirectoryException; 130 import org.apache.hadoop.fs.Path; 131 import org.apache.hadoop.fs.StorageType; 132 import org.apache.hadoop.fs.RemoteIterator; 133 import org.apache.hadoop.fs.UnresolvedLinkException; 134 import org.apache.hadoop.fs.VolumeId; 135 import org.apache.hadoop.fs.XAttr; 136 import org.apache.hadoop.fs.XAttrSetFlag; 137 import org.apache.hadoop.fs.permission.AclEntry; 138 import org.apache.hadoop.fs.permission.AclStatus; 139 import org.apache.hadoop.fs.permission.FsAction; 140 import org.apache.hadoop.fs.permission.FsPermission; 141 import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 142 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 143 import org.apache.hadoop.hdfs.net.Peer; 144 import org.apache.hadoop.hdfs.net.TcpPeerServer; 145 import org.apache.hadoop.hdfs.protocol.AclException; 146 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 147 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; 148 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; 149 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; 150 import org.apache.hadoop.hdfs.protocol.CachePoolEntry; 151 import org.apache.hadoop.hdfs.protocol.CachePoolInfo; 152 import org.apache.hadoop.hdfs.protocol.CachePoolIterator; 153 import org.apache.hadoop.hdfs.protocol.ClientProtocol; 154 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; 155 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; 156 import org.apache.hadoop.hdfs.protocol.DatanodeID; 157 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 158 import org.apache.hadoop.hdfs.protocol.DirectoryListing; 159 import org.apache.hadoop.hdfs.protocol.EncryptionZone; 160 import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; 161 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 162 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; 163 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 164 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; 165 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; 166 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; 167 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 168 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; 169 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 170 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 171 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; 172 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; 173 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; 174 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; 175 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; 176 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; 177 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; 178 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 179 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; 180 import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 181 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; 182 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 183 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 184 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; 185 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; 186 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 187 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 188 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; 189 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 190 import org.apache.hadoop.hdfs.protocolPB.PBHelper; 191 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 192 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 193 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 194 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; 195 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 196 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 197 import org.apache.hadoop.hdfs.server.namenode.NameNode; 198 import org.apache.hadoop.hdfs.server.namenode.SafeModeException; 199 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; 200 import org.apache.hadoop.hdfs.util.ByteArrayManager; 201 import org.apache.hadoop.io.DataOutputBuffer; 202 import org.apache.hadoop.io.EnumSetWritable; 203 import org.apache.hadoop.io.IOUtils; 204 import org.apache.hadoop.io.MD5Hash; 205 import org.apache.hadoop.io.Text; 206 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; 207 import org.apache.hadoop.ipc.Client; 208 import org.apache.hadoop.ipc.RPC; 209 import org.apache.hadoop.ipc.RemoteException; 210 import org.apache.hadoop.ipc.RpcInvocationHandler; 211 import org.apache.hadoop.net.DNS; 212 import org.apache.hadoop.net.NetUtils; 213 import org.apache.hadoop.security.AccessControlException; 214 import org.apache.hadoop.security.UserGroupInformation; 215 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 216 import org.apache.hadoop.security.token.Token; 217 import org.apache.hadoop.security.token.TokenRenewer; 218 import org.apache.hadoop.tracing.SpanReceiverHost; 219 import org.apache.hadoop.tracing.TraceUtils; 220 import org.apache.hadoop.util.Daemon; 221 import org.apache.hadoop.util.DataChecksum; 222 import org.apache.hadoop.util.DataChecksum.Type; 223 import org.apache.hadoop.util.Progressable; 224 import org.apache.hadoop.util.Time; 225 import org.apache.htrace.Sampler; 226 import org.apache.htrace.SamplerBuilder; 227 import org.apache.htrace.Span; 228 import org.apache.htrace.Trace; 229 import org.apache.htrace.TraceScope; 230 231 import com.google.common.annotations.VisibleForTesting; 232 import com.google.common.base.Joiner; 233 import com.google.common.base.Preconditions; 234 import com.google.common.collect.Lists; 235 import com.google.common.net.InetAddresses; 236 237 /******************************************************** 238 * DFSClient can connect to a Hadoop Filesystem and 239 * perform basic file tasks. It uses the ClientProtocol 240 * to communicate with a NameNode daemon, and connects 241 * directly to DataNodes to read/write block data. 242 * 243 * Hadoop DFS users should obtain an instance of 244 * DistributedFileSystem, which uses DFSClient to handle 245 * filesystem tasks. 246 * 247 ********************************************************/ 248 @InterfaceAudience.Private 249 public class DFSClient implements java.io.Closeable, RemotePeerFactory, 250 DataEncryptionKeyFactory { 251 public static final Log LOG = LogFactory.getLog(DFSClient.class); 252 public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour 253 static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB 254 255 private final Configuration conf; 256 private final Conf dfsClientConf; 257 final ClientProtocol namenode; 258 /* The service used for delegation tokens */ 259 private Text dtService; 260 261 final UserGroupInformation ugi; 262 volatile boolean clientRunning = true; 263 volatile long lastLeaseRenewal; 264 private volatile FsServerDefaults serverDefaults; 265 private volatile long serverDefaultsLastUpdate; 266 final String clientName; 267 final SocketFactory socketFactory; 268 final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; 269 final FileSystem.Statistics stats; 270 private final String authority; 271 private final Random r = new Random(); 272 private SocketAddress[] localInterfaceAddrs; 273 private DataEncryptionKey encryptionKey; 274 final SaslDataTransferClient saslClient; 275 private final CachingStrategy defaultReadCachingStrategy; 276 private final CachingStrategy defaultWriteCachingStrategy; 277 private final ClientContext clientContext; 278 private volatile long hedgedReadThresholdMillis; 279 private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = 280 new DFSHedgedReadMetrics(); 281 private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; 282 private final Sampler<?> traceSampler; 283 284 /** 285 * DFSClient configuration 286 */ 287 public static class Conf { 288 final int hdfsTimeout; // timeout value for a DFS operation. 289 290 final int maxFailoverAttempts; 291 final int maxRetryAttempts; 292 final int failoverSleepBaseMillis; 293 final int failoverSleepMaxMillis; 294 final int maxBlockAcquireFailures; 295 final int confTime; 296 final int ioBufferSize; 297 final ChecksumOpt defaultChecksumOpt; 298 final int writePacketSize; 299 final int writeMaxPackets; 300 final ByteArrayManager.Conf writeByteArrayManagerConf; 301 final int socketTimeout; 302 final int socketCacheCapacity; 303 final long socketCacheExpiry; 304 final long excludedNodesCacheExpiry; 305 /** Wait time window (in msec) if BlockMissingException is caught */ 306 final int timeWindow; 307 final int nCachedConnRetry; 308 final int nBlockWriteRetry; 309 final int nBlockWriteLocateFollowingRetry; 310 final long defaultBlockSize; 311 final long prefetchSize; 312 final short defaultReplication; 313 final String taskId; 314 final FsPermission uMask; 315 final boolean connectToDnViaHostname; 316 final boolean getHdfsBlocksMetadataEnabled; 317 final int getFileBlockStorageLocationsNumThreads; 318 final int getFileBlockStorageLocationsTimeoutMs; 319 final int retryTimesForGetLastBlockLength; 320 final int retryIntervalForGetLastBlockLength; 321 final long datanodeRestartTimeout; 322 final long dfsclientSlowIoWarningThresholdMs; 323 324 final boolean useLegacyBlockReader; 325 final boolean useLegacyBlockReaderLocal; 326 final String domainSocketPath; 327 final boolean skipShortCircuitChecksums; 328 final int shortCircuitBufferSize; 329 final boolean shortCircuitLocalReads; 330 final boolean domainSocketDataTraffic; 331 final int shortCircuitStreamsCacheSize; 332 final long shortCircuitStreamsCacheExpiryMs; 333 final int shortCircuitSharedMemoryWatcherInterruptCheckMs; 334 335 final boolean shortCircuitMmapEnabled; 336 final int shortCircuitMmapCacheSize; 337 final long shortCircuitMmapCacheExpiryMs; 338 final long shortCircuitMmapCacheRetryTimeout; 339 final long shortCircuitCacheStaleThresholdMs; 340 341 final long keyProviderCacheExpiryMs; 342 public BlockReaderFactory.FailureInjector brfFailureInjector = 343 new BlockReaderFactory.FailureInjector(); 344 Conf(Configuration conf)345 public Conf(Configuration conf) { 346 // The hdfsTimeout is currently the same as the ipc timeout 347 hdfsTimeout = Client.getTimeout(conf); 348 maxFailoverAttempts = conf.getInt( 349 DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 350 DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); 351 maxRetryAttempts = conf.getInt( 352 DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 353 DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT); 354 failoverSleepBaseMillis = conf.getInt( 355 DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, 356 DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); 357 failoverSleepMaxMillis = conf.getInt( 358 DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, 359 DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); 360 361 maxBlockAcquireFailures = conf.getInt( 362 DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 363 DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); 364 confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 365 HdfsServerConstants.WRITE_TIMEOUT); 366 ioBufferSize = conf.getInt( 367 CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 368 CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); 369 defaultChecksumOpt = getChecksumOptFromConf(conf); 370 socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 371 HdfsServerConstants.READ_TIMEOUT); 372 /** dfs.write.packet.size is an internal config variable */ 373 writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 374 DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); 375 writeMaxPackets = conf.getInt( 376 DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY, 377 DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT); 378 379 final boolean byteArrayManagerEnabled = conf.getBoolean( 380 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY, 381 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT); 382 if (!byteArrayManagerEnabled) { 383 writeByteArrayManagerConf = null; 384 } else { 385 final int countThreshold = conf.getInt( 386 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY, 387 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT); 388 final int countLimit = conf.getInt( 389 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY, 390 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT); 391 final long countResetTimePeriodMs = conf.getLong( 392 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY, 393 DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT); 394 writeByteArrayManagerConf = new ByteArrayManager.Conf( 395 countThreshold, countLimit, countResetTimePeriodMs); 396 } 397 398 399 defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, 400 DFS_BLOCK_SIZE_DEFAULT); 401 defaultReplication = (short) conf.getInt( 402 DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT); 403 taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); 404 socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 405 DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); 406 socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 407 DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); 408 excludedNodesCacheExpiry = conf.getLong( 409 DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL, 410 DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); 411 prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 412 10 * defaultBlockSize); 413 timeWindow = conf.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000); 414 nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, 415 DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); 416 nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY, 417 DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT); 418 nBlockWriteLocateFollowingRetry = conf.getInt( 419 DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 420 DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); 421 uMask = FsPermission.getUMask(conf); 422 connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, 423 DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 424 getHdfsBlocksMetadataEnabled = conf.getBoolean( 425 DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 426 DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); 427 getFileBlockStorageLocationsNumThreads = conf.getInt( 428 DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, 429 DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); 430 getFileBlockStorageLocationsTimeoutMs = conf.getInt( 431 DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, 432 DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT); 433 retryTimesForGetLastBlockLength = conf.getInt( 434 DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH, 435 DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); 436 retryIntervalForGetLastBlockLength = conf.getInt( 437 DFSConfigKeys.DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH, 438 DFSConfigKeys.DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT); 439 440 useLegacyBlockReader = conf.getBoolean( 441 DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, 442 DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); 443 useLegacyBlockReaderLocal = conf.getBoolean( 444 DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, 445 DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); 446 shortCircuitLocalReads = conf.getBoolean( 447 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, 448 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); 449 domainSocketDataTraffic = conf.getBoolean( 450 DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, 451 DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); 452 domainSocketPath = conf.getTrimmed( 453 DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, 454 DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT); 455 456 if (BlockReaderLocal.LOG.isDebugEnabled()) { 457 BlockReaderLocal.LOG.debug( 458 DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL 459 + " = " + useLegacyBlockReaderLocal); 460 BlockReaderLocal.LOG.debug( 461 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY 462 + " = " + shortCircuitLocalReads); 463 BlockReaderLocal.LOG.debug( 464 DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC 465 + " = " + domainSocketDataTraffic); 466 BlockReaderLocal.LOG.debug( 467 DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY 468 + " = " + domainSocketPath); 469 } 470 471 skipShortCircuitChecksums = conf.getBoolean( 472 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 473 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); 474 shortCircuitBufferSize = conf.getInt( 475 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, 476 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); 477 shortCircuitStreamsCacheSize = conf.getInt( 478 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, 479 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT); 480 shortCircuitStreamsCacheExpiryMs = conf.getLong( 481 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 482 DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT); 483 shortCircuitMmapEnabled = conf.getBoolean( 484 DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED, 485 DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT); 486 shortCircuitMmapCacheSize = conf.getInt( 487 DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 488 DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT); 489 shortCircuitMmapCacheExpiryMs = conf.getLong( 490 DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 491 DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT); 492 shortCircuitMmapCacheRetryTimeout = conf.getLong( 493 DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS, 494 DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT); 495 shortCircuitCacheStaleThresholdMs = conf.getLong( 496 DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS, 497 DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT); 498 shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( 499 DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 500 DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); 501 502 datanodeRestartTimeout = conf.getLong( 503 DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, 504 DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; 505 dfsclientSlowIoWarningThresholdMs = conf.getLong( 506 DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, 507 DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); 508 509 keyProviderCacheExpiryMs = conf.getLong( 510 DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, 511 DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT); 512 } 513 isUseLegacyBlockReaderLocal()514 public boolean isUseLegacyBlockReaderLocal() { 515 return useLegacyBlockReaderLocal; 516 } 517 getDomainSocketPath()518 public String getDomainSocketPath() { 519 return domainSocketPath; 520 } 521 isShortCircuitLocalReads()522 public boolean isShortCircuitLocalReads() { 523 return shortCircuitLocalReads; 524 } 525 isDomainSocketDataTraffic()526 public boolean isDomainSocketDataTraffic() { 527 return domainSocketDataTraffic; 528 } 529 getChecksumType(Configuration conf)530 private DataChecksum.Type getChecksumType(Configuration conf) { 531 final String checksum = conf.get( 532 DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, 533 DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); 534 try { 535 return DataChecksum.Type.valueOf(checksum); 536 } catch(IllegalArgumentException iae) { 537 LOG.warn("Bad checksum type: " + checksum + ". Using default " 538 + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); 539 return DataChecksum.Type.valueOf( 540 DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); 541 } 542 } 543 544 // Construct a checksum option from conf getChecksumOptFromConf(Configuration conf)545 private ChecksumOpt getChecksumOptFromConf(Configuration conf) { 546 DataChecksum.Type type = getChecksumType(conf); 547 int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, 548 DFS_BYTES_PER_CHECKSUM_DEFAULT); 549 return new ChecksumOpt(type, bytesPerChecksum); 550 } 551 552 // create a DataChecksum with the default option. createChecksum()553 private DataChecksum createChecksum() throws IOException { 554 return createChecksum(null); 555 } 556 createChecksum(ChecksumOpt userOpt)557 private DataChecksum createChecksum(ChecksumOpt userOpt) { 558 // Fill in any missing field with the default. 559 ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt( 560 defaultChecksumOpt, userOpt); 561 DataChecksum dataChecksum = DataChecksum.newDataChecksum( 562 myOpt.getChecksumType(), 563 myOpt.getBytesPerChecksum()); 564 if (dataChecksum == null) { 565 throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt=" 566 + userOpt + ", default=" + defaultChecksumOpt 567 + ", effective=null"); 568 } 569 return dataChecksum; 570 } 571 } 572 getConf()573 public Conf getConf() { 574 return dfsClientConf; 575 } 576 getConfiguration()577 Configuration getConfiguration() { 578 return conf; 579 } 580 581 /** 582 * A map from file names to {@link DFSOutputStream} objects 583 * that are currently being written by this client. 584 * Note that a file can only be written by a single client. 585 */ 586 private final Map<Long, DFSOutputStream> filesBeingWritten 587 = new HashMap<Long, DFSOutputStream>(); 588 589 /** 590 * Same as this(NameNode.getAddress(conf), conf); 591 * @see #DFSClient(InetSocketAddress, Configuration) 592 * @deprecated Deprecated at 0.21 593 */ 594 @Deprecated DFSClient(Configuration conf)595 public DFSClient(Configuration conf) throws IOException { 596 this(NameNode.getAddress(conf), conf); 597 } 598 DFSClient(InetSocketAddress address, Configuration conf)599 public DFSClient(InetSocketAddress address, Configuration conf) throws IOException { 600 this(NameNode.getUri(address), conf); 601 } 602 603 /** 604 * Same as this(nameNodeUri, conf, null); 605 * @see #DFSClient(URI, Configuration, FileSystem.Statistics) 606 */ DFSClient(URI nameNodeUri, Configuration conf )607 public DFSClient(URI nameNodeUri, Configuration conf 608 ) throws IOException { 609 this(nameNodeUri, conf, null); 610 } 611 612 /** 613 * Same as this(nameNodeUri, null, conf, stats); 614 * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics) 615 */ DFSClient(URI nameNodeUri, Configuration conf, FileSystem.Statistics stats)616 public DFSClient(URI nameNodeUri, Configuration conf, 617 FileSystem.Statistics stats) 618 throws IOException { 619 this(nameNodeUri, null, conf, stats); 620 } 621 622 /** 623 * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. 624 * If HA is enabled and a positive value is set for 625 * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the 626 * configuration, the DFSClient will use {@link LossyRetryInvocationHandler} 627 * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode 628 * must be null. 629 */ 630 @VisibleForTesting DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)631 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, 632 Configuration conf, FileSystem.Statistics stats) 633 throws IOException { 634 SpanReceiverHost.get(conf, DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX); 635 traceSampler = new SamplerBuilder(TraceUtils. 636 wrapHadoopConf(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX, conf)).build(); 637 // Copy only the required DFSClient configuration 638 this.dfsClientConf = new Conf(conf); 639 if (this.dfsClientConf.useLegacyBlockReaderLocal) { 640 LOG.debug("Using legacy short-circuit local reads."); 641 } 642 this.conf = conf; 643 this.stats = stats; 644 this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); 645 this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); 646 647 this.ugi = UserGroupInformation.getCurrentUser(); 648 649 this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); 650 this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 651 DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); 652 int numResponseToDrop = conf.getInt( 653 DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, 654 DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); 655 NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null; 656 AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false); 657 if (numResponseToDrop > 0) { 658 // This case is used for testing. 659 LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY 660 + " is set to " + numResponseToDrop 661 + ", this hacked client will proactively drop responses"); 662 proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf, 663 nameNodeUri, ClientProtocol.class, numResponseToDrop, 664 nnFallbackToSimpleAuth); 665 } 666 667 if (proxyInfo != null) { 668 this.dtService = proxyInfo.getDelegationTokenService(); 669 this.namenode = proxyInfo.getProxy(); 670 } else if (rpcNamenode != null) { 671 // This case is used for testing. 672 Preconditions.checkArgument(nameNodeUri == null); 673 this.namenode = rpcNamenode; 674 dtService = null; 675 } else { 676 Preconditions.checkArgument(nameNodeUri != null, 677 "null URI"); 678 proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, 679 ClientProtocol.class, nnFallbackToSimpleAuth); 680 this.dtService = proxyInfo.getDelegationTokenService(); 681 this.namenode = proxyInfo.getProxy(); 682 } 683 684 String localInterfaces[] = 685 conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES); 686 localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); 687 if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { 688 LOG.debug("Using local interfaces [" + 689 Joiner.on(',').join(localInterfaces)+ "] with addresses [" + 690 Joiner.on(',').join(localInterfaceAddrs) + "]"); 691 } 692 693 Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ? 694 null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false); 695 Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ? 696 null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0); 697 Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ? 698 null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false); 699 this.defaultReadCachingStrategy = 700 new CachingStrategy(readDropBehind, readahead); 701 this.defaultWriteCachingStrategy = 702 new CachingStrategy(writeDropBehind, readahead); 703 this.clientContext = ClientContext.get( 704 conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT), 705 dfsClientConf); 706 this.hedgedReadThresholdMillis = conf.getLong( 707 DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 708 DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS); 709 int numThreads = conf.getInt( 710 DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 711 DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE); 712 if (numThreads > 0) { 713 this.initThreadsNumForHedgedReads(numThreads); 714 } 715 this.saslClient = new SaslDataTransferClient( 716 conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), 717 TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); 718 } 719 720 /** 721 * Return the socket addresses to use with each configured 722 * local interface. Local interfaces may be specified by IP 723 * address, IP address range using CIDR notation, interface 724 * name (e.g. eth0) or sub-interface name (e.g. eth0:0). 725 * The socket addresses consist of the IPs for the interfaces 726 * and the ephemeral port (port 0). If an IP, IP range, or 727 * interface name matches an interface with sub-interfaces 728 * only the IP of the interface is used. Sub-interfaces can 729 * be used by specifying them explicitly (by IP or name). 730 * 731 * @return SocketAddresses for the configured local interfaces, 732 * or an empty array if none are configured 733 * @throws UnknownHostException if a given interface name is invalid 734 */ getLocalInterfaceAddrs( String interfaceNames[])735 private static SocketAddress[] getLocalInterfaceAddrs( 736 String interfaceNames[]) throws UnknownHostException { 737 List<SocketAddress> localAddrs = new ArrayList<SocketAddress>(); 738 for (String interfaceName : interfaceNames) { 739 if (InetAddresses.isInetAddress(interfaceName)) { 740 localAddrs.add(new InetSocketAddress(interfaceName, 0)); 741 } else if (NetUtils.isValidSubnet(interfaceName)) { 742 for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) { 743 localAddrs.add(new InetSocketAddress(addr, 0)); 744 } 745 } else { 746 for (String ip : DNS.getIPs(interfaceName, false)) { 747 localAddrs.add(new InetSocketAddress(ip, 0)); 748 } 749 } 750 } 751 return localAddrs.toArray(new SocketAddress[localAddrs.size()]); 752 } 753 754 /** 755 * Select one of the configured local interfaces at random. We use a random 756 * interface because other policies like round-robin are less effective 757 * given that we cache connections to datanodes. 758 * 759 * @return one of the local interface addresses at random, or null if no 760 * local interfaces are configured 761 */ getRandomLocalInterfaceAddr()762 SocketAddress getRandomLocalInterfaceAddr() { 763 if (localInterfaceAddrs.length == 0) { 764 return null; 765 } 766 final int idx = r.nextInt(localInterfaceAddrs.length); 767 final SocketAddress addr = localInterfaceAddrs[idx]; 768 if (LOG.isDebugEnabled()) { 769 LOG.debug("Using local interface " + addr); 770 } 771 return addr; 772 } 773 774 /** 775 * Return the number of times the client should go back to the namenode 776 * to retrieve block locations when reading. 777 */ getMaxBlockAcquireFailures()778 int getMaxBlockAcquireFailures() { 779 return dfsClientConf.maxBlockAcquireFailures; 780 } 781 782 /** 783 * Return the timeout that clients should use when writing to datanodes. 784 * @param numNodes the number of nodes in the pipeline. 785 */ getDatanodeWriteTimeout(int numNodes)786 int getDatanodeWriteTimeout(int numNodes) { 787 return (dfsClientConf.confTime > 0) ? 788 (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0; 789 } 790 getDatanodeReadTimeout(int numNodes)791 int getDatanodeReadTimeout(int numNodes) { 792 return dfsClientConf.socketTimeout > 0 ? 793 (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes + 794 dfsClientConf.socketTimeout) : 0; 795 } 796 getHdfsTimeout()797 int getHdfsTimeout() { 798 return dfsClientConf.hdfsTimeout; 799 } 800 801 @VisibleForTesting getClientName()802 public String getClientName() { 803 return clientName; 804 } 805 checkOpen()806 void checkOpen() throws IOException { 807 if (!clientRunning) { 808 IOException result = new IOException("Filesystem closed"); 809 throw result; 810 } 811 } 812 813 /** Return the lease renewer instance. The renewer thread won't start 814 * until the first output stream is created. The same instance will 815 * be returned until all output streams are closed. 816 */ getLeaseRenewer()817 public LeaseRenewer getLeaseRenewer() throws IOException { 818 return LeaseRenewer.getInstance(authority, ugi, this); 819 } 820 821 /** Get a lease and start automatic renewal */ beginFileLease(final long inodeId, final DFSOutputStream out)822 private void beginFileLease(final long inodeId, final DFSOutputStream out) 823 throws IOException { 824 getLeaseRenewer().put(inodeId, out, this); 825 } 826 827 /** Stop renewal of lease for the file. */ endFileLease(final long inodeId)828 void endFileLease(final long inodeId) throws IOException { 829 getLeaseRenewer().closeFile(inodeId, this); 830 } 831 832 833 /** Put a file. Only called from LeaseRenewer, where proper locking is 834 * enforced to consistently update its local dfsclients array and 835 * client's filesBeingWritten map. 836 */ putFileBeingWritten(final long inodeId, final DFSOutputStream out)837 void putFileBeingWritten(final long inodeId, final DFSOutputStream out) { 838 synchronized(filesBeingWritten) { 839 filesBeingWritten.put(inodeId, out); 840 // update the last lease renewal time only when there was no 841 // writes. once there is one write stream open, the lease renewer 842 // thread keeps it updated well with in anyone's expiration time. 843 if (lastLeaseRenewal == 0) { 844 updateLastLeaseRenewal(); 845 } 846 } 847 } 848 849 /** Remove a file. Only called from LeaseRenewer. */ removeFileBeingWritten(final long inodeId)850 void removeFileBeingWritten(final long inodeId) { 851 synchronized(filesBeingWritten) { 852 filesBeingWritten.remove(inodeId); 853 if (filesBeingWritten.isEmpty()) { 854 lastLeaseRenewal = 0; 855 } 856 } 857 } 858 859 /** Is file-being-written map empty? */ isFilesBeingWrittenEmpty()860 boolean isFilesBeingWrittenEmpty() { 861 synchronized(filesBeingWritten) { 862 return filesBeingWritten.isEmpty(); 863 } 864 } 865 866 /** @return true if the client is running */ isClientRunning()867 boolean isClientRunning() { 868 return clientRunning; 869 } 870 getLastLeaseRenewal()871 long getLastLeaseRenewal() { 872 return lastLeaseRenewal; 873 } 874 updateLastLeaseRenewal()875 void updateLastLeaseRenewal() { 876 synchronized(filesBeingWritten) { 877 if (filesBeingWritten.isEmpty()) { 878 return; 879 } 880 lastLeaseRenewal = Time.monotonicNow(); 881 } 882 } 883 884 /** 885 * Renew leases. 886 * @return true if lease was renewed. May return false if this 887 * client has been closed or has no files open. 888 **/ renewLease()889 boolean renewLease() throws IOException { 890 if (clientRunning && !isFilesBeingWrittenEmpty()) { 891 try { 892 namenode.renewLease(clientName); 893 updateLastLeaseRenewal(); 894 return true; 895 } catch (IOException e) { 896 // Abort if the lease has already expired. 897 final long elapsed = Time.monotonicNow() - getLastLeaseRenewal(); 898 if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) { 899 LOG.warn("Failed to renew lease for " + clientName + " for " 900 + (elapsed/1000) + " seconds (>= hard-limit =" 901 + (HdfsConstants.LEASE_HARDLIMIT_PERIOD/1000) + " seconds.) " 902 + "Closing all files being written ...", e); 903 closeAllFilesBeingWritten(true); 904 } else { 905 // Let the lease renewer handle it and retry. 906 throw e; 907 } 908 } 909 } 910 return false; 911 } 912 913 /** 914 * Close connections the Namenode. 915 */ closeConnectionToNamenode()916 void closeConnectionToNamenode() { 917 RPC.stopProxy(namenode); 918 } 919 920 /** Close/abort all files being written. */ closeAllFilesBeingWritten(final boolean abort)921 public void closeAllFilesBeingWritten(final boolean abort) { 922 for(;;) { 923 final long inodeId; 924 final DFSOutputStream out; 925 synchronized(filesBeingWritten) { 926 if (filesBeingWritten.isEmpty()) { 927 return; 928 } 929 inodeId = filesBeingWritten.keySet().iterator().next(); 930 out = filesBeingWritten.remove(inodeId); 931 } 932 if (out != null) { 933 try { 934 if (abort) { 935 out.abort(); 936 } else { 937 out.close(); 938 } 939 } catch(IOException ie) { 940 LOG.error("Failed to " + (abort? "abort": "close") + 941 " inode " + inodeId, ie); 942 } 943 } 944 } 945 } 946 947 /** 948 * Close the file system, abandoning all of the leases and files being 949 * created and close connections to the namenode. 950 */ 951 @Override close()952 public synchronized void close() throws IOException { 953 if(clientRunning) { 954 closeAllFilesBeingWritten(false); 955 clientRunning = false; 956 getLeaseRenewer().closeClient(this); 957 // close connections to the namenode 958 closeConnectionToNamenode(); 959 } 960 } 961 962 /** 963 * Close all open streams, abandoning all of the leases and files being 964 * created. 965 * @param abort whether streams should be gracefully closed 966 */ closeOutputStreams(boolean abort)967 public void closeOutputStreams(boolean abort) { 968 if (clientRunning) { 969 closeAllFilesBeingWritten(abort); 970 } 971 } 972 973 /** 974 * Get the default block size for this cluster 975 * @return the default block size in bytes 976 */ getDefaultBlockSize()977 public long getDefaultBlockSize() { 978 return dfsClientConf.defaultBlockSize; 979 } 980 981 /** 982 * @see ClientProtocol#getPreferredBlockSize(String) 983 */ getBlockSize(String f)984 public long getBlockSize(String f) throws IOException { 985 TraceScope scope = getPathTraceScope("getBlockSize", f); 986 try { 987 return namenode.getPreferredBlockSize(f); 988 } catch (IOException ie) { 989 LOG.warn("Problem getting block size", ie); 990 throw ie; 991 } finally { 992 scope.close(); 993 } 994 } 995 996 /** 997 * Get server default values for a number of configuration params. 998 * @see ClientProtocol#getServerDefaults() 999 */ getServerDefaults()1000 public FsServerDefaults getServerDefaults() throws IOException { 1001 long now = Time.monotonicNow(); 1002 if ((serverDefaults == null) || 1003 (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) { 1004 serverDefaults = namenode.getServerDefaults(); 1005 serverDefaultsLastUpdate = now; 1006 } 1007 assert serverDefaults != null; 1008 return serverDefaults; 1009 } 1010 1011 /** 1012 * Get a canonical token service name for this client's tokens. Null should 1013 * be returned if the client is not using tokens. 1014 * @return the token service for the client 1015 */ 1016 @InterfaceAudience.LimitedPrivate( { "HDFS" }) getCanonicalServiceName()1017 public String getCanonicalServiceName() { 1018 return (dtService != null) ? dtService.toString() : null; 1019 } 1020 1021 /** 1022 * @see ClientProtocol#getDelegationToken(Text) 1023 */ getDelegationToken(Text renewer)1024 public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) 1025 throws IOException { 1026 assert dtService != null; 1027 TraceScope scope = Trace.startSpan("getDelegationToken", traceSampler); 1028 try { 1029 Token<DelegationTokenIdentifier> token = 1030 namenode.getDelegationToken(renewer); 1031 if (token != null) { 1032 token.setService(this.dtService); 1033 LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); 1034 } else { 1035 LOG.info("Cannot get delegation token from " + renewer); 1036 } 1037 return token; 1038 } finally { 1039 scope.close(); 1040 } 1041 } 1042 1043 /** 1044 * Renew a delegation token 1045 * @param token the token to renew 1046 * @return the new expiration time 1047 * @throws InvalidToken 1048 * @throws IOException 1049 * @deprecated Use Token.renew instead. 1050 */ 1051 @Deprecated renewDelegationToken(Token<DelegationTokenIdentifier> token)1052 public long renewDelegationToken(Token<DelegationTokenIdentifier> token) 1053 throws InvalidToken, IOException { 1054 LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token)); 1055 try { 1056 return token.renew(conf); 1057 } catch (InterruptedException ie) { 1058 throw new RuntimeException("caught interrupted", ie); 1059 } catch (RemoteException re) { 1060 throw re.unwrapRemoteException(InvalidToken.class, 1061 AccessControlException.class); 1062 } 1063 } 1064 1065 private static final Map<String, Boolean> localAddrMap = Collections 1066 .synchronizedMap(new HashMap<String, Boolean>()); 1067 isLocalAddress(InetSocketAddress targetAddr)1068 public static boolean isLocalAddress(InetSocketAddress targetAddr) { 1069 InetAddress addr = targetAddr.getAddress(); 1070 Boolean cached = localAddrMap.get(addr.getHostAddress()); 1071 if (cached != null) { 1072 if (LOG.isTraceEnabled()) { 1073 LOG.trace("Address " + targetAddr + 1074 (cached ? " is local" : " is not local")); 1075 } 1076 return cached; 1077 } 1078 1079 boolean local = NetUtils.isLocalAddress(addr); 1080 1081 if (LOG.isTraceEnabled()) { 1082 LOG.trace("Address " + targetAddr + 1083 (local ? " is local" : " is not local")); 1084 } 1085 localAddrMap.put(addr.getHostAddress(), local); 1086 return local; 1087 } 1088 1089 /** 1090 * Cancel a delegation token 1091 * @param token the token to cancel 1092 * @throws InvalidToken 1093 * @throws IOException 1094 * @deprecated Use Token.cancel instead. 1095 */ 1096 @Deprecated cancelDelegationToken(Token<DelegationTokenIdentifier> token)1097 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) 1098 throws InvalidToken, IOException { 1099 LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token)); 1100 try { 1101 token.cancel(conf); 1102 } catch (InterruptedException ie) { 1103 throw new RuntimeException("caught interrupted", ie); 1104 } catch (RemoteException re) { 1105 throw re.unwrapRemoteException(InvalidToken.class, 1106 AccessControlException.class); 1107 } 1108 } 1109 1110 @InterfaceAudience.Private 1111 public static class Renewer extends TokenRenewer { 1112 1113 static { 1114 //Ensure that HDFS Configuration files are loaded before trying to use 1115 // the renewer. HdfsConfiguration.init()1116 HdfsConfiguration.init(); 1117 } 1118 1119 @Override handleKind(Text kind)1120 public boolean handleKind(Text kind) { 1121 return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind); 1122 } 1123 1124 @SuppressWarnings("unchecked") 1125 @Override renew(Token<?> token, Configuration conf)1126 public long renew(Token<?> token, Configuration conf) throws IOException { 1127 Token<DelegationTokenIdentifier> delToken = 1128 (Token<DelegationTokenIdentifier>) token; 1129 ClientProtocol nn = getNNProxy(delToken, conf); 1130 try { 1131 return nn.renewDelegationToken(delToken); 1132 } catch (RemoteException re) { 1133 throw re.unwrapRemoteException(InvalidToken.class, 1134 AccessControlException.class); 1135 } 1136 } 1137 1138 @SuppressWarnings("unchecked") 1139 @Override cancel(Token<?> token, Configuration conf)1140 public void cancel(Token<?> token, Configuration conf) throws IOException { 1141 Token<DelegationTokenIdentifier> delToken = 1142 (Token<DelegationTokenIdentifier>) token; 1143 LOG.info("Cancelling " + 1144 DelegationTokenIdentifier.stringifyToken(delToken)); 1145 ClientProtocol nn = getNNProxy(delToken, conf); 1146 try { 1147 nn.cancelDelegationToken(delToken); 1148 } catch (RemoteException re) { 1149 throw re.unwrapRemoteException(InvalidToken.class, 1150 AccessControlException.class); 1151 } 1152 } 1153 getNNProxy( Token<DelegationTokenIdentifier> token, Configuration conf)1154 private static ClientProtocol getNNProxy( 1155 Token<DelegationTokenIdentifier> token, Configuration conf) 1156 throws IOException { 1157 URI uri = HAUtil.getServiceUriFromToken(HdfsConstants.HDFS_URI_SCHEME, 1158 token); 1159 if (HAUtil.isTokenForLogicalUri(token) && 1160 !HAUtil.isLogicalUri(conf, uri)) { 1161 // If the token is for a logical nameservice, but the configuration 1162 // we have disagrees about that, we can't actually renew it. 1163 // This can be the case in MR, for example, if the RM doesn't 1164 // have all of the HA clusters configured in its configuration. 1165 throw new IOException("Unable to map logical nameservice URI '" + 1166 uri + "' to a NameNode. Local configuration does not have " + 1167 "a failover proxy provider configured."); 1168 } 1169 1170 NameNodeProxies.ProxyAndInfo<ClientProtocol> info = 1171 NameNodeProxies.createProxy(conf, uri, ClientProtocol.class); 1172 assert info.getDelegationTokenService().equals(token.getService()) : 1173 "Returned service '" + info.getDelegationTokenService().toString() + 1174 "' doesn't match expected service '" + 1175 token.getService().toString() + "'"; 1176 1177 return info.getProxy(); 1178 } 1179 1180 @Override isManaged(Token<?> token)1181 public boolean isManaged(Token<?> token) throws IOException { 1182 return true; 1183 } 1184 1185 } 1186 1187 /** 1188 * Report corrupt blocks that were discovered by the client. 1189 * @see ClientProtocol#reportBadBlocks(LocatedBlock[]) 1190 */ reportBadBlocks(LocatedBlock[] blocks)1191 public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { 1192 namenode.reportBadBlocks(blocks); 1193 } 1194 getDefaultReplication()1195 public short getDefaultReplication() { 1196 return dfsClientConf.defaultReplication; 1197 } 1198 getLocatedBlocks(String src, long start)1199 public LocatedBlocks getLocatedBlocks(String src, long start) 1200 throws IOException { 1201 return getLocatedBlocks(src, start, dfsClientConf.prefetchSize); 1202 } 1203 1204 /* 1205 * This is just a wrapper around callGetBlockLocations, but non-static so that 1206 * we can stub it out for tests. 1207 */ 1208 @VisibleForTesting getLocatedBlocks(String src, long start, long length)1209 public LocatedBlocks getLocatedBlocks(String src, long start, long length) 1210 throws IOException { 1211 TraceScope scope = getPathTraceScope("getBlockLocations", src); 1212 try { 1213 return callGetBlockLocations(namenode, src, start, length); 1214 } finally { 1215 scope.close(); 1216 } 1217 } 1218 1219 /** 1220 * @see ClientProtocol#getBlockLocations(String, long, long) 1221 */ callGetBlockLocations(ClientProtocol namenode, String src, long start, long length)1222 static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, 1223 String src, long start, long length) 1224 throws IOException { 1225 try { 1226 return namenode.getBlockLocations(src, start, length); 1227 } catch(RemoteException re) { 1228 throw re.unwrapRemoteException(AccessControlException.class, 1229 FileNotFoundException.class, 1230 UnresolvedPathException.class); 1231 } 1232 } 1233 1234 /** 1235 * Recover a file's lease 1236 * @param src a file's path 1237 * @return true if the file is already closed 1238 * @throws IOException 1239 */ recoverLease(String src)1240 boolean recoverLease(String src) throws IOException { 1241 checkOpen(); 1242 1243 TraceScope scope = getPathTraceScope("recoverLease", src); 1244 try { 1245 return namenode.recoverLease(src, clientName); 1246 } catch (RemoteException re) { 1247 throw re.unwrapRemoteException(FileNotFoundException.class, 1248 AccessControlException.class, 1249 UnresolvedPathException.class); 1250 } finally { 1251 scope.close(); 1252 } 1253 } 1254 1255 /** 1256 * Get block location info about file 1257 * 1258 * getBlockLocations() returns a list of hostnames that store 1259 * data for a specific file region. It returns a set of hostnames 1260 * for every block within the indicated region. 1261 * 1262 * This function is very useful when writing code that considers 1263 * data-placement when performing operations. For example, the 1264 * MapReduce system tries to schedule tasks on the same machines 1265 * as the data-block the task processes. 1266 */ getBlockLocations(String src, long start, long length)1267 public BlockLocation[] getBlockLocations(String src, long start, 1268 long length) throws IOException, UnresolvedLinkException { 1269 TraceScope scope = getPathTraceScope("getBlockLocations", src); 1270 try { 1271 LocatedBlocks blocks = getLocatedBlocks(src, start, length); 1272 BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); 1273 HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; 1274 for (int i = 0; i < locations.length; i++) { 1275 hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); 1276 } 1277 return hdfsLocations; 1278 } finally { 1279 scope.close(); 1280 } 1281 } 1282 1283 /** 1284 * Get block location information about a list of {@link HdfsBlockLocation}. 1285 * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to 1286 * get {@link BlockStorageLocation}s for blocks returned by 1287 * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} 1288 * . 1289 * 1290 * This is done by making a round of RPCs to the associated datanodes, asking 1291 * the volume of each block replica. The returned array of 1292 * {@link BlockStorageLocation} expose this information as a 1293 * {@link VolumeId}. 1294 * 1295 * @param blockLocations 1296 * target blocks on which to query volume location information 1297 * @return volumeBlockLocations original block array augmented with additional 1298 * volume location information for each replica. 1299 */ getBlockStorageLocations( List<BlockLocation> blockLocations)1300 public BlockStorageLocation[] getBlockStorageLocations( 1301 List<BlockLocation> blockLocations) throws IOException, 1302 UnsupportedOperationException, InvalidBlockTokenException { 1303 if (!getConf().getHdfsBlocksMetadataEnabled) { 1304 throw new UnsupportedOperationException("Datanode-side support for " + 1305 "getVolumeBlockLocations() must also be enabled in the client " + 1306 "configuration."); 1307 } 1308 // Downcast blockLocations and fetch out required LocatedBlock(s) 1309 List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); 1310 for (BlockLocation loc : blockLocations) { 1311 if (!(loc instanceof HdfsBlockLocation)) { 1312 throw new ClassCastException("DFSClient#getVolumeBlockLocations " + 1313 "expected to be passed HdfsBlockLocations"); 1314 } 1315 HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; 1316 blocks.add(hdfsLoc.getLocatedBlock()); 1317 } 1318 1319 // Re-group the LocatedBlocks to be grouped by datanodes, with the values 1320 // a list of the LocatedBlocks on the datanode. 1321 Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 1322 new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); 1323 for (LocatedBlock b : blocks) { 1324 for (DatanodeInfo info : b.getLocations()) { 1325 if (!datanodeBlocks.containsKey(info)) { 1326 datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); 1327 } 1328 List<LocatedBlock> l = datanodeBlocks.get(info); 1329 l.add(b); 1330 } 1331 } 1332 1333 // Make RPCs to the datanodes to get volume locations for its replicas 1334 TraceScope scope = 1335 Trace.startSpan("getBlockStorageLocations", traceSampler); 1336 Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; 1337 try { 1338 metadatas = BlockStorageLocationUtil. 1339 queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, 1340 getConf().getFileBlockStorageLocationsNumThreads, 1341 getConf().getFileBlockStorageLocationsTimeoutMs, 1342 getConf().connectToDnViaHostname); 1343 if (LOG.isTraceEnabled()) { 1344 LOG.trace("metadata returned: " 1345 + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); 1346 } 1347 } finally { 1348 scope.close(); 1349 } 1350 1351 // Regroup the returned VolumeId metadata to again be grouped by 1352 // LocatedBlock rather than by datanode 1353 Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil 1354 .associateVolumeIdsWithBlocks(blocks, metadatas); 1355 1356 // Combine original BlockLocations with new VolumeId information 1357 BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil 1358 .convertToVolumeBlockLocations(blocks, blockVolumeIds); 1359 1360 return volumeBlockLocations; 1361 } 1362 1363 /** 1364 * Decrypts a EDEK by consulting the KeyProvider. 1365 */ decryptEncryptedDataEncryptionKey(FileEncryptionInfo feInfo)1366 private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo 1367 feInfo) throws IOException { 1368 TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler); 1369 try { 1370 KeyProvider provider = getKeyProvider(); 1371 if (provider == null) { 1372 throw new IOException("No KeyProvider is configured, cannot access" + 1373 " an encrypted file"); 1374 } 1375 EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( 1376 feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), 1377 feInfo.getEncryptedDataEncryptionKey()); 1378 try { 1379 KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension 1380 .createKeyProviderCryptoExtension(provider); 1381 return cryptoProvider.decryptEncryptedKey(ekv); 1382 } catch (GeneralSecurityException e) { 1383 throw new IOException(e); 1384 } 1385 } finally { 1386 scope.close(); 1387 } 1388 } 1389 1390 /** 1391 * Obtain the crypto protocol version from the provided FileEncryptionInfo, 1392 * checking to see if this version is supported by. 1393 * 1394 * @param feInfo FileEncryptionInfo 1395 * @return CryptoProtocolVersion from the feInfo 1396 * @throws IOException if the protocol version is unsupported. 1397 */ getCryptoProtocolVersion(FileEncryptionInfo feInfo)1398 private static CryptoProtocolVersion getCryptoProtocolVersion 1399 (FileEncryptionInfo feInfo) throws IOException { 1400 final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion(); 1401 if (!CryptoProtocolVersion.supports(version)) { 1402 throw new IOException("Client does not support specified " + 1403 "CryptoProtocolVersion " + version.getDescription() + " version " + 1404 "number" + version.getVersion()); 1405 } 1406 return version; 1407 } 1408 1409 /** 1410 * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo 1411 * and the available CryptoCodecs configured in the Configuration. 1412 * 1413 * @param conf Configuration 1414 * @param feInfo FileEncryptionInfo 1415 * @return CryptoCodec 1416 * @throws IOException if no suitable CryptoCodec for the CipherSuite is 1417 * available. 1418 */ getCryptoCodec(Configuration conf, FileEncryptionInfo feInfo)1419 private static CryptoCodec getCryptoCodec(Configuration conf, 1420 FileEncryptionInfo feInfo) throws IOException { 1421 final CipherSuite suite = feInfo.getCipherSuite(); 1422 if (suite.equals(CipherSuite.UNKNOWN)) { 1423 throw new IOException("NameNode specified unknown CipherSuite with ID " 1424 + suite.getUnknownValue() + ", cannot instantiate CryptoCodec."); 1425 } 1426 final CryptoCodec codec = CryptoCodec.getInstance(conf, suite); 1427 if (codec == null) { 1428 throw new UnknownCipherSuiteException( 1429 "No configuration found for the cipher suite " 1430 + suite.getConfigSuffix() + " prefixed with " 1431 + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX 1432 + ". Please see the example configuration " 1433 + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE " 1434 + "at core-default.xml for details."); 1435 } 1436 return codec; 1437 } 1438 1439 /** 1440 * Wraps the stream in a CryptoInputStream if the underlying file is 1441 * encrypted. 1442 */ createWrappedInputStream(DFSInputStream dfsis)1443 public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis) 1444 throws IOException { 1445 final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo(); 1446 if (feInfo != null) { 1447 // File is encrypted, wrap the stream in a crypto stream. 1448 // Currently only one version, so no special logic based on the version # 1449 getCryptoProtocolVersion(feInfo); 1450 final CryptoCodec codec = getCryptoCodec(conf, feInfo); 1451 final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); 1452 final CryptoInputStream cryptoIn = 1453 new CryptoInputStream(dfsis, codec, decrypted.getMaterial(), 1454 feInfo.getIV()); 1455 return new HdfsDataInputStream(cryptoIn); 1456 } else { 1457 // No FileEncryptionInfo so no encryption. 1458 return new HdfsDataInputStream(dfsis); 1459 } 1460 } 1461 1462 /** 1463 * Wraps the stream in a CryptoOutputStream if the underlying file is 1464 * encrypted. 1465 */ createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics)1466 public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, 1467 FileSystem.Statistics statistics) throws IOException { 1468 return createWrappedOutputStream(dfsos, statistics, 0); 1469 } 1470 1471 /** 1472 * Wraps the stream in a CryptoOutputStream if the underlying file is 1473 * encrypted. 1474 */ createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics, long startPos)1475 public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, 1476 FileSystem.Statistics statistics, long startPos) throws IOException { 1477 final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo(); 1478 if (feInfo != null) { 1479 // File is encrypted, wrap the stream in a crypto stream. 1480 // Currently only one version, so no special logic based on the version # 1481 getCryptoProtocolVersion(feInfo); 1482 final CryptoCodec codec = getCryptoCodec(conf, feInfo); 1483 KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); 1484 final CryptoOutputStream cryptoOut = 1485 new CryptoOutputStream(dfsos, codec, 1486 decrypted.getMaterial(), feInfo.getIV(), startPos); 1487 return new HdfsDataOutputStream(cryptoOut, statistics, startPos); 1488 } else { 1489 // No FileEncryptionInfo present so no encryption. 1490 return new HdfsDataOutputStream(dfsos, statistics, startPos); 1491 } 1492 } 1493 open(String src)1494 public DFSInputStream open(String src) 1495 throws IOException, UnresolvedLinkException { 1496 return open(src, dfsClientConf.ioBufferSize, true, null); 1497 } 1498 1499 /** 1500 * Create an input stream that obtains a nodelist from the 1501 * namenode, and then reads from all the right places. Creates 1502 * inner subclass of InputStream that does the right out-of-band 1503 * work. 1504 * @deprecated Use {@link #open(String, int, boolean)} instead. 1505 */ 1506 @Deprecated open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats)1507 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, 1508 FileSystem.Statistics stats) 1509 throws IOException, UnresolvedLinkException { 1510 return open(src, buffersize, verifyChecksum); 1511 } 1512 1513 1514 /** 1515 * Create an input stream that obtains a nodelist from the 1516 * namenode, and then reads from all the right places. Creates 1517 * inner subclass of InputStream that does the right out-of-band 1518 * work. 1519 */ open(String src, int buffersize, boolean verifyChecksum)1520 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) 1521 throws IOException, UnresolvedLinkException { 1522 checkOpen(); 1523 // Get block info from namenode 1524 TraceScope scope = getPathTraceScope("newDFSInputStream", src); 1525 try { 1526 return new DFSInputStream(this, src, verifyChecksum); 1527 } finally { 1528 scope.close(); 1529 } 1530 } 1531 1532 /** 1533 * Get the namenode associated with this DFSClient object 1534 * @return the namenode associated with this DFSClient object 1535 */ getNamenode()1536 public ClientProtocol getNamenode() { 1537 return namenode; 1538 } 1539 1540 /** 1541 * Call {@link #create(String, boolean, short, long, Progressable)} with 1542 * default <code>replication</code> and <code>blockSize<code> and null <code> 1543 * progress</code>. 1544 */ create(String src, boolean overwrite)1545 public OutputStream create(String src, boolean overwrite) 1546 throws IOException { 1547 return create(src, overwrite, dfsClientConf.defaultReplication, 1548 dfsClientConf.defaultBlockSize, null); 1549 } 1550 1551 /** 1552 * Call {@link #create(String, boolean, short, long, Progressable)} with 1553 * default <code>replication</code> and <code>blockSize<code>. 1554 */ create(String src, boolean overwrite, Progressable progress)1555 public OutputStream create(String src, 1556 boolean overwrite, 1557 Progressable progress) throws IOException { 1558 return create(src, overwrite, dfsClientConf.defaultReplication, 1559 dfsClientConf.defaultBlockSize, progress); 1560 } 1561 1562 /** 1563 * Call {@link #create(String, boolean, short, long, Progressable)} with 1564 * null <code>progress</code>. 1565 */ create(String src, boolean overwrite, short replication, long blockSize)1566 public OutputStream create(String src, 1567 boolean overwrite, 1568 short replication, 1569 long blockSize) throws IOException { 1570 return create(src, overwrite, replication, blockSize, null); 1571 } 1572 1573 /** 1574 * Call {@link #create(String, boolean, short, long, Progressable, int)} 1575 * with default bufferSize. 1576 */ create(String src, boolean overwrite, short replication, long blockSize, Progressable progress)1577 public OutputStream create(String src, boolean overwrite, short replication, 1578 long blockSize, Progressable progress) throws IOException { 1579 return create(src, overwrite, replication, blockSize, progress, 1580 dfsClientConf.ioBufferSize); 1581 } 1582 1583 /** 1584 * Call {@link #create(String, FsPermission, EnumSet, short, long, 1585 * Progressable, int, ChecksumOpt)} with default <code>permission</code> 1586 * {@link FsPermission#getFileDefault()}. 1587 * 1588 * @param src File name 1589 * @param overwrite overwrite an existing file if true 1590 * @param replication replication factor for the file 1591 * @param blockSize maximum block size 1592 * @param progress interface for reporting client progress 1593 * @param buffersize underlying buffersize 1594 * 1595 * @return output stream 1596 */ create(String src, boolean overwrite, short replication, long blockSize, Progressable progress, int buffersize)1597 public OutputStream create(String src, 1598 boolean overwrite, 1599 short replication, 1600 long blockSize, 1601 Progressable progress, 1602 int buffersize) 1603 throws IOException { 1604 return create(src, FsPermission.getFileDefault(), 1605 overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) 1606 : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress, 1607 buffersize, null); 1608 } 1609 1610 /** 1611 * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 1612 * long, Progressable, int, ChecksumOpt)} with <code>createParent</code> 1613 * set to true. 1614 */ create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt)1615 public DFSOutputStream create(String src, 1616 FsPermission permission, 1617 EnumSet<CreateFlag> flag, 1618 short replication, 1619 long blockSize, 1620 Progressable progress, 1621 int buffersize, 1622 ChecksumOpt checksumOpt) 1623 throws IOException { 1624 return create(src, permission, flag, true, 1625 replication, blockSize, progress, buffersize, checksumOpt, null); 1626 } 1627 1628 /** 1629 * Create a new dfs file with the specified block replication 1630 * with write-progress reporting and return an output stream for writing 1631 * into the file. 1632 * 1633 * @param src File name 1634 * @param permission The permission of the directory being created. 1635 * If null, use default permission {@link FsPermission#getFileDefault()} 1636 * @param flag indicates create a new file or create/overwrite an 1637 * existing file or append to an existing file 1638 * @param createParent create missing parent directory if true 1639 * @param replication block replication 1640 * @param blockSize maximum block size 1641 * @param progress interface for reporting client progress 1642 * @param buffersize underlying buffer size 1643 * @param checksumOpt checksum options 1644 * 1645 * @return output stream 1646 * 1647 * @see ClientProtocol#create for detailed description of exceptions thrown 1648 */ create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt)1649 public DFSOutputStream create(String src, 1650 FsPermission permission, 1651 EnumSet<CreateFlag> flag, 1652 boolean createParent, 1653 short replication, 1654 long blockSize, 1655 Progressable progress, 1656 int buffersize, 1657 ChecksumOpt checksumOpt) throws IOException { 1658 return create(src, permission, flag, createParent, replication, blockSize, 1659 progress, buffersize, checksumOpt, null); 1660 } 1661 1662 /** 1663 * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, 1664 * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is 1665 * a hint to where the namenode should place the file blocks. 1666 * The favored nodes hint is not persisted in HDFS. Hence it may be honored 1667 * at the creation time only. HDFS could move the blocks during balancing or 1668 * replication, to move the blocks from favored nodes. A value of null means 1669 * no favored nodes for this create 1670 */ create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)1671 public DFSOutputStream create(String src, 1672 FsPermission permission, 1673 EnumSet<CreateFlag> flag, 1674 boolean createParent, 1675 short replication, 1676 long blockSize, 1677 Progressable progress, 1678 int buffersize, 1679 ChecksumOpt checksumOpt, 1680 InetSocketAddress[] favoredNodes) throws IOException { 1681 checkOpen(); 1682 if (permission == null) { 1683 permission = FsPermission.getFileDefault(); 1684 } 1685 FsPermission masked = permission.applyUMask(dfsClientConf.uMask); 1686 if(LOG.isDebugEnabled()) { 1687 LOG.debug(src + ": masked=" + masked); 1688 } 1689 final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, 1690 src, masked, flag, createParent, replication, blockSize, progress, 1691 buffersize, dfsClientConf.createChecksum(checksumOpt), 1692 getFavoredNodesStr(favoredNodes)); 1693 beginFileLease(result.getFileId(), result); 1694 return result; 1695 } 1696 getFavoredNodesStr(InetSocketAddress[] favoredNodes)1697 private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) { 1698 String[] favoredNodeStrs = null; 1699 if (favoredNodes != null) { 1700 favoredNodeStrs = new String[favoredNodes.length]; 1701 for (int i = 0; i < favoredNodes.length; i++) { 1702 favoredNodeStrs[i] = 1703 favoredNodes[i].getHostName() + ":" 1704 + favoredNodes[i].getPort(); 1705 } 1706 } 1707 return favoredNodeStrs; 1708 } 1709 1710 /** 1711 * Append to an existing file if {@link CreateFlag#APPEND} is present 1712 */ primitiveAppend(String src, EnumSet<CreateFlag> flag, int buffersize, Progressable progress)1713 private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag, 1714 int buffersize, Progressable progress) throws IOException { 1715 if (flag.contains(CreateFlag.APPEND)) { 1716 HdfsFileStatus stat = getFileInfo(src); 1717 if (stat == null) { // No file to append to 1718 // New file needs to be created if create option is present 1719 if (!flag.contains(CreateFlag.CREATE)) { 1720 throw new FileNotFoundException("failed to append to non-existent file " 1721 + src + " on client " + clientName); 1722 } 1723 return null; 1724 } 1725 return callAppend(src, buffersize, flag, progress, null); 1726 } 1727 return null; 1728 } 1729 1730 /** 1731 * Same as {{@link #create(String, FsPermission, EnumSet, short, long, 1732 * Progressable, int, ChecksumOpt)} except that the permission 1733 * is absolute (ie has already been masked with umask. 1734 */ primitiveCreate(String src, FsPermission absPermission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt)1735 public DFSOutputStream primitiveCreate(String src, 1736 FsPermission absPermission, 1737 EnumSet<CreateFlag> flag, 1738 boolean createParent, 1739 short replication, 1740 long blockSize, 1741 Progressable progress, 1742 int buffersize, 1743 ChecksumOpt checksumOpt) 1744 throws IOException, UnresolvedLinkException { 1745 checkOpen(); 1746 CreateFlag.validate(flag); 1747 DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress); 1748 if (result == null) { 1749 DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); 1750 result = DFSOutputStream.newStreamForCreate(this, src, absPermission, 1751 flag, createParent, replication, blockSize, progress, buffersize, 1752 checksum, null); 1753 } 1754 beginFileLease(result.getFileId(), result); 1755 return result; 1756 } 1757 1758 /** 1759 * Creates a symbolic link. 1760 * 1761 * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean) 1762 */ createSymlink(String target, String link, boolean createParent)1763 public void createSymlink(String target, String link, boolean createParent) 1764 throws IOException { 1765 TraceScope scope = getPathTraceScope("createSymlink", target); 1766 try { 1767 FsPermission dirPerm = 1768 FsPermission.getDefault().applyUMask(dfsClientConf.uMask); 1769 namenode.createSymlink(target, link, dirPerm, createParent); 1770 } catch (RemoteException re) { 1771 throw re.unwrapRemoteException(AccessControlException.class, 1772 FileAlreadyExistsException.class, 1773 FileNotFoundException.class, 1774 ParentNotDirectoryException.class, 1775 NSQuotaExceededException.class, 1776 DSQuotaExceededException.class, 1777 UnresolvedPathException.class, 1778 SnapshotAccessControlException.class); 1779 } finally { 1780 scope.close(); 1781 } 1782 } 1783 1784 /** 1785 * Resolve the *first* symlink, if any, in the path. 1786 * 1787 * @see ClientProtocol#getLinkTarget(String) 1788 */ getLinkTarget(String path)1789 public String getLinkTarget(String path) throws IOException { 1790 checkOpen(); 1791 TraceScope scope = getPathTraceScope("getLinkTarget", path); 1792 try { 1793 return namenode.getLinkTarget(path); 1794 } catch (RemoteException re) { 1795 throw re.unwrapRemoteException(AccessControlException.class, 1796 FileNotFoundException.class); 1797 } finally { 1798 scope.close(); 1799 } 1800 } 1801 1802 /** Method to get stream returned by append call */ callAppend(String src, int buffersize, EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)1803 private DFSOutputStream callAppend(String src, int buffersize, 1804 EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) 1805 throws IOException { 1806 CreateFlag.validateForAppend(flag); 1807 try { 1808 LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, 1809 new EnumSetWritable<>(flag, CreateFlag.class)); 1810 HdfsFileStatus status = blkWithStatus.getFileStatus(); 1811 if (status == null) { 1812 DFSClient.LOG.debug("NameNode is on an older version, request file " + 1813 "info with additional RPC call for file: " + src); 1814 status = getFileInfo(src); 1815 } 1816 return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, 1817 progress, blkWithStatus.getLastBlock(), 1818 status, dfsClientConf.createChecksum(), 1819 favoredNodes); 1820 } catch(RemoteException re) { 1821 throw re.unwrapRemoteException(AccessControlException.class, 1822 FileNotFoundException.class, 1823 SafeModeException.class, 1824 DSQuotaExceededException.class, 1825 UnsupportedOperationException.class, 1826 UnresolvedPathException.class, 1827 SnapshotAccessControlException.class); 1828 } 1829 } 1830 1831 /** 1832 * Append to an existing HDFS file. 1833 * 1834 * @param src file name 1835 * @param buffersize buffer size 1836 * @param flag indicates whether to append data to a new block instead of 1837 * the last block 1838 * @param progress for reporting write-progress; null is acceptable. 1839 * @param statistics file system statistics; null is acceptable. 1840 * @return an output stream for writing into the file 1841 * 1842 * @see ClientProtocol#append(String, String, EnumSetWritable) 1843 */ append(final String src, final int buffersize, EnumSet<CreateFlag> flag, final Progressable progress, final FileSystem.Statistics statistics)1844 public HdfsDataOutputStream append(final String src, final int buffersize, 1845 EnumSet<CreateFlag> flag, final Progressable progress, 1846 final FileSystem.Statistics statistics) throws IOException { 1847 final DFSOutputStream out = append(src, buffersize, flag, null, progress); 1848 return createWrappedOutputStream(out, statistics, out.getInitialLen()); 1849 } 1850 1851 /** 1852 * Append to an existing HDFS file. 1853 * 1854 * @param src file name 1855 * @param buffersize buffer size 1856 * @param flag indicates whether to append data to a new block instead of the 1857 * last block 1858 * @param progress for reporting write-progress; null is acceptable. 1859 * @param statistics file system statistics; null is acceptable. 1860 * @param favoredNodes FavoredNodes for new blocks 1861 * @return an output stream for writing into the file 1862 * @see ClientProtocol#append(String, String, EnumSetWritable) 1863 */ append(final String src, final int buffersize, EnumSet<CreateFlag> flag, final Progressable progress, final FileSystem.Statistics statistics, final InetSocketAddress[] favoredNodes)1864 public HdfsDataOutputStream append(final String src, final int buffersize, 1865 EnumSet<CreateFlag> flag, final Progressable progress, 1866 final FileSystem.Statistics statistics, 1867 final InetSocketAddress[] favoredNodes) throws IOException { 1868 final DFSOutputStream out = append(src, buffersize, flag, 1869 getFavoredNodesStr(favoredNodes), progress); 1870 return createWrappedOutputStream(out, statistics, out.getInitialLen()); 1871 } 1872 append(String src, int buffersize, EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)1873 private DFSOutputStream append(String src, int buffersize, 1874 EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress) 1875 throws IOException { 1876 checkOpen(); 1877 final DFSOutputStream result = callAppend(src, buffersize, flag, progress, 1878 favoredNodes); 1879 beginFileLease(result.getFileId(), result); 1880 return result; 1881 } 1882 1883 /** 1884 * Set replication for an existing file. 1885 * @param src file name 1886 * @param replication replication to set the file to 1887 * 1888 * @see ClientProtocol#setReplication(String, short) 1889 */ setReplication(String src, short replication)1890 public boolean setReplication(String src, short replication) 1891 throws IOException { 1892 TraceScope scope = getPathTraceScope("setReplication", src); 1893 try { 1894 return namenode.setReplication(src, replication); 1895 } catch(RemoteException re) { 1896 throw re.unwrapRemoteException(AccessControlException.class, 1897 FileNotFoundException.class, 1898 SafeModeException.class, 1899 DSQuotaExceededException.class, 1900 UnresolvedPathException.class, 1901 SnapshotAccessControlException.class); 1902 } finally { 1903 scope.close(); 1904 } 1905 } 1906 1907 /** 1908 * Set storage policy for an existing file/directory 1909 * @param src file/directory name 1910 * @param policyName name of the storage policy 1911 */ setStoragePolicy(String src, String policyName)1912 public void setStoragePolicy(String src, String policyName) 1913 throws IOException { 1914 TraceScope scope = getPathTraceScope("setStoragePolicy", src); 1915 try { 1916 namenode.setStoragePolicy(src, policyName); 1917 } catch (RemoteException e) { 1918 throw e.unwrapRemoteException(AccessControlException.class, 1919 FileNotFoundException.class, 1920 SafeModeException.class, 1921 NSQuotaExceededException.class, 1922 UnresolvedPathException.class, 1923 SnapshotAccessControlException.class); 1924 } finally { 1925 scope.close(); 1926 } 1927 } 1928 1929 /** 1930 * @return All the existing storage policies 1931 */ getStoragePolicies()1932 public BlockStoragePolicy[] getStoragePolicies() throws IOException { 1933 TraceScope scope = Trace.startSpan("getStoragePolicies", traceSampler); 1934 try { 1935 return namenode.getStoragePolicies(); 1936 } finally { 1937 scope.close(); 1938 } 1939 } 1940 1941 /** 1942 * Rename file or directory. 1943 * @see ClientProtocol#rename(String, String) 1944 * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. 1945 */ 1946 @Deprecated rename(String src, String dst)1947 public boolean rename(String src, String dst) throws IOException { 1948 checkOpen(); 1949 TraceScope scope = getSrcDstTraceScope("rename", src, dst); 1950 try { 1951 return namenode.rename(src, dst); 1952 } catch(RemoteException re) { 1953 throw re.unwrapRemoteException(AccessControlException.class, 1954 NSQuotaExceededException.class, 1955 DSQuotaExceededException.class, 1956 UnresolvedPathException.class, 1957 SnapshotAccessControlException.class); 1958 } finally { 1959 scope.close(); 1960 } 1961 } 1962 1963 /** 1964 * Move blocks from src to trg and delete src 1965 * See {@link ClientProtocol#concat}. 1966 */ concat(String trg, String [] srcs)1967 public void concat(String trg, String [] srcs) throws IOException { 1968 checkOpen(); 1969 TraceScope scope = Trace.startSpan("concat", traceSampler); 1970 try { 1971 namenode.concat(trg, srcs); 1972 } catch(RemoteException re) { 1973 throw re.unwrapRemoteException(AccessControlException.class, 1974 UnresolvedPathException.class, 1975 SnapshotAccessControlException.class); 1976 } finally { 1977 scope.close(); 1978 } 1979 } 1980 /** 1981 * Rename file or directory. 1982 * @see ClientProtocol#rename2(String, String, Options.Rename...) 1983 */ rename(String src, String dst, Options.Rename... options)1984 public void rename(String src, String dst, Options.Rename... options) 1985 throws IOException { 1986 checkOpen(); 1987 TraceScope scope = getSrcDstTraceScope("rename2", src, dst); 1988 try { 1989 namenode.rename2(src, dst, options); 1990 } catch(RemoteException re) { 1991 throw re.unwrapRemoteException(AccessControlException.class, 1992 DSQuotaExceededException.class, 1993 FileAlreadyExistsException.class, 1994 FileNotFoundException.class, 1995 ParentNotDirectoryException.class, 1996 SafeModeException.class, 1997 NSQuotaExceededException.class, 1998 UnresolvedPathException.class, 1999 SnapshotAccessControlException.class); 2000 } finally { 2001 scope.close(); 2002 } 2003 } 2004 2005 /** 2006 * Truncate a file to an indicated size 2007 * See {@link ClientProtocol#truncate}. 2008 */ truncate(String src, long newLength)2009 public boolean truncate(String src, long newLength) throws IOException { 2010 checkOpen(); 2011 if (newLength < 0) { 2012 throw new HadoopIllegalArgumentException( 2013 "Cannot truncate to a negative file size: " + newLength + "."); 2014 } 2015 try { 2016 return namenode.truncate(src, newLength, clientName); 2017 } catch (RemoteException re) { 2018 throw re.unwrapRemoteException(AccessControlException.class, 2019 UnresolvedPathException.class); 2020 } 2021 } 2022 2023 /** 2024 * Delete file or directory. 2025 * See {@link ClientProtocol#delete(String, boolean)}. 2026 */ 2027 @Deprecated delete(String src)2028 public boolean delete(String src) throws IOException { 2029 checkOpen(); 2030 return delete(src, true); 2031 } 2032 2033 /** 2034 * delete file or directory. 2035 * delete contents of the directory if non empty and recursive 2036 * set to true 2037 * 2038 * @see ClientProtocol#delete(String, boolean) 2039 */ delete(String src, boolean recursive)2040 public boolean delete(String src, boolean recursive) throws IOException { 2041 checkOpen(); 2042 TraceScope scope = getPathTraceScope("delete", src); 2043 try { 2044 return namenode.delete(src, recursive); 2045 } catch(RemoteException re) { 2046 throw re.unwrapRemoteException(AccessControlException.class, 2047 FileNotFoundException.class, 2048 SafeModeException.class, 2049 UnresolvedPathException.class, 2050 SnapshotAccessControlException.class); 2051 } finally { 2052 scope.close(); 2053 } 2054 } 2055 2056 /** Implemented using getFileInfo(src) 2057 */ exists(String src)2058 public boolean exists(String src) throws IOException { 2059 checkOpen(); 2060 return getFileInfo(src) != null; 2061 } 2062 2063 /** 2064 * Get a partial listing of the indicated directory 2065 * No block locations need to be fetched 2066 */ listPaths(String src, byte[] startAfter)2067 public DirectoryListing listPaths(String src, byte[] startAfter) 2068 throws IOException { 2069 return listPaths(src, startAfter, false); 2070 } 2071 2072 /** 2073 * Get a partial listing of the indicated directory 2074 * 2075 * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter 2076 * if the application wants to fetch a listing starting from 2077 * the first entry in the directory 2078 * 2079 * @see ClientProtocol#getListing(String, byte[], boolean) 2080 */ listPaths(String src, byte[] startAfter, boolean needLocation)2081 public DirectoryListing listPaths(String src, byte[] startAfter, 2082 boolean needLocation) throws IOException { 2083 checkOpen(); 2084 TraceScope scope = getPathTraceScope("listPaths", src); 2085 try { 2086 return namenode.getListing(src, startAfter, needLocation); 2087 } catch(RemoteException re) { 2088 throw re.unwrapRemoteException(AccessControlException.class, 2089 FileNotFoundException.class, 2090 UnresolvedPathException.class); 2091 } finally { 2092 scope.close(); 2093 } 2094 } 2095 2096 /** 2097 * Get the file info for a specific file or directory. 2098 * @param src The string representation of the path to the file 2099 * @return object containing information regarding the file 2100 * or null if file not found 2101 * 2102 * @see ClientProtocol#getFileInfo(String) for description of exceptions 2103 */ getFileInfo(String src)2104 public HdfsFileStatus getFileInfo(String src) throws IOException { 2105 checkOpen(); 2106 TraceScope scope = getPathTraceScope("getFileInfo", src); 2107 try { 2108 return namenode.getFileInfo(src); 2109 } catch(RemoteException re) { 2110 throw re.unwrapRemoteException(AccessControlException.class, 2111 FileNotFoundException.class, 2112 UnresolvedPathException.class); 2113 } finally { 2114 scope.close(); 2115 } 2116 } 2117 2118 /** 2119 * Close status of a file 2120 * @return true if file is already closed 2121 */ isFileClosed(String src)2122 public boolean isFileClosed(String src) throws IOException{ 2123 checkOpen(); 2124 TraceScope scope = getPathTraceScope("isFileClosed", src); 2125 try { 2126 return namenode.isFileClosed(src); 2127 } catch(RemoteException re) { 2128 throw re.unwrapRemoteException(AccessControlException.class, 2129 FileNotFoundException.class, 2130 UnresolvedPathException.class); 2131 } finally { 2132 scope.close(); 2133 } 2134 } 2135 2136 /** 2137 * Get the file info for a specific file or directory. If src 2138 * refers to a symlink then the FileStatus of the link is returned. 2139 * @param src path to a file or directory. 2140 * 2141 * For description of exceptions thrown 2142 * @see ClientProtocol#getFileLinkInfo(String) 2143 */ getFileLinkInfo(String src)2144 public HdfsFileStatus getFileLinkInfo(String src) throws IOException { 2145 checkOpen(); 2146 TraceScope scope = getPathTraceScope("getFileLinkInfo", src); 2147 try { 2148 return namenode.getFileLinkInfo(src); 2149 } catch(RemoteException re) { 2150 throw re.unwrapRemoteException(AccessControlException.class, 2151 UnresolvedPathException.class); 2152 } finally { 2153 scope.close(); 2154 } 2155 } 2156 2157 @InterfaceAudience.Private clearDataEncryptionKey()2158 public void clearDataEncryptionKey() { 2159 LOG.debug("Clearing encryption key"); 2160 synchronized (this) { 2161 encryptionKey = null; 2162 } 2163 } 2164 2165 /** 2166 * @return true if data sent between this client and DNs should be encrypted, 2167 * false otherwise. 2168 * @throws IOException in the event of error communicating with the NN 2169 */ shouldEncryptData()2170 boolean shouldEncryptData() throws IOException { 2171 FsServerDefaults d = getServerDefaults(); 2172 return d == null ? false : d.getEncryptDataTransfer(); 2173 } 2174 2175 @Override newDataEncryptionKey()2176 public DataEncryptionKey newDataEncryptionKey() throws IOException { 2177 if (shouldEncryptData()) { 2178 synchronized (this) { 2179 if (encryptionKey == null || 2180 encryptionKey.expiryDate < Time.now()) { 2181 LOG.debug("Getting new encryption token from NN"); 2182 encryptionKey = namenode.getDataEncryptionKey(); 2183 } 2184 return encryptionKey; 2185 } 2186 } else { 2187 return null; 2188 } 2189 } 2190 2191 /** 2192 * Get the checksum of the whole file of a range of the file. Note that the 2193 * range always starts from the beginning of the file. 2194 * @param src The file path 2195 * @param length the length of the range, i.e., the range is [0, length] 2196 * @return The checksum 2197 * @see DistributedFileSystem#getFileChecksum(Path) 2198 */ getFileChecksum(String src, long length)2199 public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) 2200 throws IOException { 2201 checkOpen(); 2202 Preconditions.checkArgument(length >= 0); 2203 //get block locations for the file range 2204 LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, 2205 length); 2206 if (null == blockLocations) { 2207 throw new FileNotFoundException("File does not exist: " + src); 2208 } 2209 List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks(); 2210 final DataOutputBuffer md5out = new DataOutputBuffer(); 2211 int bytesPerCRC = -1; 2212 DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; 2213 long crcPerBlock = 0; 2214 boolean refetchBlocks = false; 2215 int lastRetriedIndex = -1; 2216 2217 // get block checksum for each block 2218 long remaining = length; 2219 if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) { 2220 remaining = Math.min(length, blockLocations.getFileLength()); 2221 } 2222 for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { 2223 if (refetchBlocks) { // refetch to get fresh tokens 2224 blockLocations = callGetBlockLocations(namenode, src, 0, length); 2225 if (null == blockLocations) { 2226 throw new FileNotFoundException("File does not exist: " + src); 2227 } 2228 locatedblocks = blockLocations.getLocatedBlocks(); 2229 refetchBlocks = false; 2230 } 2231 LocatedBlock lb = locatedblocks.get(i); 2232 final ExtendedBlock block = lb.getBlock(); 2233 if (remaining < block.getNumBytes()) { 2234 block.setNumBytes(remaining); 2235 } 2236 remaining -= block.getNumBytes(); 2237 final DatanodeInfo[] datanodes = lb.getLocations(); 2238 2239 //try each datanode location of the block 2240 final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout; 2241 boolean done = false; 2242 for(int j = 0; !done && j < datanodes.length; j++) { 2243 DataOutputStream out = null; 2244 DataInputStream in = null; 2245 2246 try { 2247 //connect to a datanode 2248 IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); 2249 out = new DataOutputStream(new BufferedOutputStream(pair.out, 2250 HdfsConstants.SMALL_BUFFER_SIZE)); 2251 in = new DataInputStream(pair.in); 2252 2253 if (LOG.isDebugEnabled()) { 2254 LOG.debug("write to " + datanodes[j] + ": " 2255 + Op.BLOCK_CHECKSUM + ", block=" + block); 2256 } 2257 // get block MD5 2258 new Sender(out).blockChecksum(block, lb.getBlockToken()); 2259 2260 final BlockOpResponseProto reply = 2261 BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); 2262 2263 String logInfo = "for block " + block + " from datanode " + datanodes[j]; 2264 DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); 2265 2266 OpBlockChecksumResponseProto checksumData = 2267 reply.getChecksumResponse(); 2268 2269 //read byte-per-checksum 2270 final int bpc = checksumData.getBytesPerCrc(); 2271 if (i == 0) { //first block 2272 bytesPerCRC = bpc; 2273 } 2274 else if (bpc != bytesPerCRC) { 2275 throw new IOException("Byte-per-checksum not matched: bpc=" + bpc 2276 + " but bytesPerCRC=" + bytesPerCRC); 2277 } 2278 2279 //read crc-per-block 2280 final long cpb = checksumData.getCrcPerBlock(); 2281 if (locatedblocks.size() > 1 && i == 0) { 2282 crcPerBlock = cpb; 2283 } 2284 2285 //read md5 2286 final MD5Hash md5 = new MD5Hash( 2287 checksumData.getMd5().toByteArray()); 2288 md5.write(md5out); 2289 2290 // read crc-type 2291 final DataChecksum.Type ct; 2292 if (checksumData.hasCrcType()) { 2293 ct = PBHelper.convert(checksumData 2294 .getCrcType()); 2295 } else { 2296 LOG.debug("Retrieving checksum from an earlier-version DataNode: " + 2297 "inferring checksum by reading first byte"); 2298 ct = inferChecksumTypeByReading(lb, datanodes[j]); 2299 } 2300 2301 if (i == 0) { // first block 2302 crcType = ct; 2303 } else if (crcType != DataChecksum.Type.MIXED 2304 && crcType != ct) { 2305 // if crc types are mixed in a file 2306 crcType = DataChecksum.Type.MIXED; 2307 } 2308 2309 done = true; 2310 2311 if (LOG.isDebugEnabled()) { 2312 if (i == 0) { 2313 LOG.debug("set bytesPerCRC=" + bytesPerCRC 2314 + ", crcPerBlock=" + crcPerBlock); 2315 } 2316 LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5); 2317 } 2318 } catch (InvalidBlockTokenException ibte) { 2319 if (i > lastRetriedIndex) { 2320 if (LOG.isDebugEnabled()) { 2321 LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " 2322 + "for file " + src + " for block " + block 2323 + " from datanode " + datanodes[j] 2324 + ". Will retry the block once."); 2325 } 2326 lastRetriedIndex = i; 2327 done = true; // actually it's not done; but we'll retry 2328 i--; // repeat at i-th block 2329 refetchBlocks = true; 2330 break; 2331 } 2332 } catch (IOException ie) { 2333 LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie); 2334 } finally { 2335 IOUtils.closeStream(in); 2336 IOUtils.closeStream(out); 2337 } 2338 } 2339 2340 if (!done) { 2341 throw new IOException("Fail to get block MD5 for " + block); 2342 } 2343 } 2344 2345 //compute file MD5 2346 final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); 2347 switch (crcType) { 2348 case CRC32: 2349 return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, 2350 crcPerBlock, fileMD5); 2351 case CRC32C: 2352 return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, 2353 crcPerBlock, fileMD5); 2354 default: 2355 // If there is no block allocated for the file, 2356 // return one with the magic entry that matches what previous 2357 // hdfs versions return. 2358 if (locatedblocks.size() == 0) { 2359 return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); 2360 } 2361 2362 // we should never get here since the validity was checked 2363 // when getCrcType() was called above. 2364 return null; 2365 } 2366 } 2367 2368 /** 2369 * Connect to the given datanode's datantrasfer port, and return 2370 * the resulting IOStreamPair. This includes encryption wrapping, etc. 2371 */ connectToDN(DatanodeInfo dn, int timeout, LocatedBlock lb)2372 private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, 2373 LocatedBlock lb) throws IOException { 2374 boolean success = false; 2375 Socket sock = null; 2376 try { 2377 sock = socketFactory.createSocket(); 2378 String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname); 2379 if (LOG.isDebugEnabled()) { 2380 LOG.debug("Connecting to datanode " + dnAddr); 2381 } 2382 NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); 2383 sock.setSoTimeout(timeout); 2384 2385 OutputStream unbufOut = NetUtils.getOutputStream(sock); 2386 InputStream unbufIn = NetUtils.getInputStream(sock); 2387 IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, 2388 lb.getBlockToken(), dn); 2389 success = true; 2390 return ret; 2391 } finally { 2392 if (!success) { 2393 IOUtils.closeSocket(sock); 2394 } 2395 } 2396 } 2397 2398 /** 2399 * Infer the checksum type for a replica by sending an OP_READ_BLOCK 2400 * for the first byte of that replica. This is used for compatibility 2401 * with older HDFS versions which did not include the checksum type in 2402 * OpBlockChecksumResponseProto. 2403 * 2404 * @param lb the located block 2405 * @param dn the connected datanode 2406 * @return the inferred checksum type 2407 * @throws IOException if an error occurs 2408 */ inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)2409 private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) 2410 throws IOException { 2411 IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); 2412 2413 try { 2414 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, 2415 HdfsConstants.SMALL_BUFFER_SIZE)); 2416 DataInputStream in = new DataInputStream(pair.in); 2417 2418 new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 2419 0, 1, true, CachingStrategy.newDefaultStrategy()); 2420 final BlockOpResponseProto reply = 2421 BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); 2422 String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; 2423 DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); 2424 2425 return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); 2426 } finally { 2427 IOUtils.cleanup(null, pair.in, pair.out); 2428 } 2429 } 2430 2431 /** 2432 * Set permissions to a file or directory. 2433 * @param src path name. 2434 * @param permission permission to set to 2435 * 2436 * @see ClientProtocol#setPermission(String, FsPermission) 2437 */ setPermission(String src, FsPermission permission)2438 public void setPermission(String src, FsPermission permission) 2439 throws IOException { 2440 checkOpen(); 2441 TraceScope scope = getPathTraceScope("setPermission", src); 2442 try { 2443 namenode.setPermission(src, permission); 2444 } catch(RemoteException re) { 2445 throw re.unwrapRemoteException(AccessControlException.class, 2446 FileNotFoundException.class, 2447 SafeModeException.class, 2448 UnresolvedPathException.class, 2449 SnapshotAccessControlException.class); 2450 } finally { 2451 scope.close(); 2452 } 2453 } 2454 2455 /** 2456 * Set file or directory owner. 2457 * @param src path name. 2458 * @param username user id. 2459 * @param groupname user group. 2460 * 2461 * @see ClientProtocol#setOwner(String, String, String) 2462 */ setOwner(String src, String username, String groupname)2463 public void setOwner(String src, String username, String groupname) 2464 throws IOException { 2465 checkOpen(); 2466 TraceScope scope = getPathTraceScope("setOwner", src); 2467 try { 2468 namenode.setOwner(src, username, groupname); 2469 } catch(RemoteException re) { 2470 throw re.unwrapRemoteException(AccessControlException.class, 2471 FileNotFoundException.class, 2472 SafeModeException.class, 2473 UnresolvedPathException.class, 2474 SnapshotAccessControlException.class); 2475 } finally { 2476 scope.close(); 2477 } 2478 } 2479 callGetStats()2480 private long[] callGetStats() throws IOException { 2481 checkOpen(); 2482 TraceScope scope = Trace.startSpan("getStats", traceSampler); 2483 try { 2484 return namenode.getStats(); 2485 } finally { 2486 scope.close(); 2487 } 2488 } 2489 2490 /** 2491 * @see ClientProtocol#getStats() 2492 */ getDiskStatus()2493 public FsStatus getDiskStatus() throws IOException { 2494 long rawNums[] = callGetStats(); 2495 return new FsStatus(rawNums[0], rawNums[1], rawNums[2]); 2496 } 2497 2498 /** 2499 * Returns count of blocks with no good replicas left. Normally should be 2500 * zero. 2501 * @throws IOException 2502 */ getMissingBlocksCount()2503 public long getMissingBlocksCount() throws IOException { 2504 return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]; 2505 } 2506 2507 /** 2508 * Returns count of blocks with replication factor 1 and have 2509 * lost the only replica. 2510 * @throws IOException 2511 */ getMissingReplOneBlocksCount()2512 public long getMissingReplOneBlocksCount() throws IOException { 2513 return callGetStats()[ClientProtocol. 2514 GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]; 2515 } 2516 2517 /** 2518 * Returns count of blocks with one of more replica missing. 2519 * @throws IOException 2520 */ getUnderReplicatedBlocksCount()2521 public long getUnderReplicatedBlocksCount() throws IOException { 2522 return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]; 2523 } 2524 2525 /** 2526 * Returns count of blocks with at least one replica marked corrupt. 2527 * @throws IOException 2528 */ getCorruptBlocksCount()2529 public long getCorruptBlocksCount() throws IOException { 2530 return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]; 2531 } 2532 2533 /** 2534 * @return a list in which each entry describes a corrupt file/block 2535 * @throws IOException 2536 */ listCorruptFileBlocks(String path, String cookie)2537 public CorruptFileBlocks listCorruptFileBlocks(String path, 2538 String cookie) 2539 throws IOException { 2540 checkOpen(); 2541 TraceScope scope = getPathTraceScope("listCorruptFileBlocks", path); 2542 try { 2543 return namenode.listCorruptFileBlocks(path, cookie); 2544 } finally { 2545 scope.close(); 2546 } 2547 } 2548 datanodeReport(DatanodeReportType type)2549 public DatanodeInfo[] datanodeReport(DatanodeReportType type) 2550 throws IOException { 2551 checkOpen(); 2552 TraceScope scope = Trace.startSpan("datanodeReport", traceSampler); 2553 try { 2554 return namenode.getDatanodeReport(type); 2555 } finally { 2556 scope.close(); 2557 } 2558 } 2559 getDatanodeStorageReport( DatanodeReportType type)2560 public DatanodeStorageReport[] getDatanodeStorageReport( 2561 DatanodeReportType type) throws IOException { 2562 checkOpen(); 2563 TraceScope scope = 2564 Trace.startSpan("datanodeStorageReport", traceSampler); 2565 try { 2566 return namenode.getDatanodeStorageReport(type); 2567 } finally { 2568 scope.close(); 2569 } 2570 } 2571 2572 /** 2573 * Enter, leave or get safe mode. 2574 * 2575 * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean) 2576 */ setSafeMode(SafeModeAction action)2577 public boolean setSafeMode(SafeModeAction action) throws IOException { 2578 return setSafeMode(action, false); 2579 } 2580 2581 /** 2582 * Enter, leave or get safe mode. 2583 * 2584 * @param action 2585 * One of SafeModeAction.GET, SafeModeAction.ENTER and 2586 * SafeModeActiob.LEAVE 2587 * @param isChecked 2588 * If true, then check only active namenode's safemode status, else 2589 * check first namenode's status. 2590 * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean) 2591 */ setSafeMode(SafeModeAction action, boolean isChecked)2592 public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{ 2593 TraceScope scope = 2594 Trace.startSpan("setSafeMode", traceSampler); 2595 try { 2596 return namenode.setSafeMode(action, isChecked); 2597 } finally { 2598 scope.close(); 2599 } 2600 } 2601 2602 /** 2603 * Create one snapshot. 2604 * 2605 * @param snapshotRoot The directory where the snapshot is to be taken 2606 * @param snapshotName Name of the snapshot 2607 * @return the snapshot path. 2608 * @see ClientProtocol#createSnapshot(String, String) 2609 */ createSnapshot(String snapshotRoot, String snapshotName)2610 public String createSnapshot(String snapshotRoot, String snapshotName) 2611 throws IOException { 2612 checkOpen(); 2613 TraceScope scope = Trace.startSpan("createSnapshot", traceSampler); 2614 try { 2615 return namenode.createSnapshot(snapshotRoot, snapshotName); 2616 } catch(RemoteException re) { 2617 throw re.unwrapRemoteException(); 2618 } finally { 2619 scope.close(); 2620 } 2621 } 2622 2623 /** 2624 * Delete a snapshot of a snapshottable directory. 2625 * 2626 * @param snapshotRoot The snapshottable directory that the 2627 * to-be-deleted snapshot belongs to 2628 * @param snapshotName The name of the to-be-deleted snapshot 2629 * @throws IOException 2630 * @see ClientProtocol#deleteSnapshot(String, String) 2631 */ deleteSnapshot(String snapshotRoot, String snapshotName)2632 public void deleteSnapshot(String snapshotRoot, String snapshotName) 2633 throws IOException { 2634 checkOpen(); 2635 TraceScope scope = Trace.startSpan("deleteSnapshot", traceSampler); 2636 try { 2637 namenode.deleteSnapshot(snapshotRoot, snapshotName); 2638 } catch(RemoteException re) { 2639 throw re.unwrapRemoteException(); 2640 } finally { 2641 scope.close(); 2642 } 2643 } 2644 2645 /** 2646 * Rename a snapshot. 2647 * @param snapshotDir The directory path where the snapshot was taken 2648 * @param snapshotOldName Old name of the snapshot 2649 * @param snapshotNewName New name of the snapshot 2650 * @throws IOException 2651 * @see ClientProtocol#renameSnapshot(String, String, String) 2652 */ renameSnapshot(String snapshotDir, String snapshotOldName, String snapshotNewName)2653 public void renameSnapshot(String snapshotDir, String snapshotOldName, 2654 String snapshotNewName) throws IOException { 2655 checkOpen(); 2656 TraceScope scope = Trace.startSpan("renameSnapshot", traceSampler); 2657 try { 2658 namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName); 2659 } catch(RemoteException re) { 2660 throw re.unwrapRemoteException(); 2661 } finally { 2662 scope.close(); 2663 } 2664 } 2665 2666 /** 2667 * Get all the current snapshottable directories. 2668 * @return All the current snapshottable directories 2669 * @throws IOException 2670 * @see ClientProtocol#getSnapshottableDirListing() 2671 */ getSnapshottableDirListing()2672 public SnapshottableDirectoryStatus[] getSnapshottableDirListing() 2673 throws IOException { 2674 checkOpen(); 2675 TraceScope scope = Trace.startSpan("getSnapshottableDirListing", 2676 traceSampler); 2677 try { 2678 return namenode.getSnapshottableDirListing(); 2679 } catch(RemoteException re) { 2680 throw re.unwrapRemoteException(); 2681 } finally { 2682 scope.close(); 2683 } 2684 } 2685 2686 /** 2687 * Allow snapshot on a directory. 2688 * 2689 * @see ClientProtocol#allowSnapshot(String snapshotRoot) 2690 */ allowSnapshot(String snapshotRoot)2691 public void allowSnapshot(String snapshotRoot) throws IOException { 2692 checkOpen(); 2693 TraceScope scope = Trace.startSpan("allowSnapshot", traceSampler); 2694 try { 2695 namenode.allowSnapshot(snapshotRoot); 2696 } catch (RemoteException re) { 2697 throw re.unwrapRemoteException(); 2698 } finally { 2699 scope.close(); 2700 } 2701 } 2702 2703 /** 2704 * Disallow snapshot on a directory. 2705 * 2706 * @see ClientProtocol#disallowSnapshot(String snapshotRoot) 2707 */ disallowSnapshot(String snapshotRoot)2708 public void disallowSnapshot(String snapshotRoot) throws IOException { 2709 checkOpen(); 2710 TraceScope scope = Trace.startSpan("disallowSnapshot", traceSampler); 2711 try { 2712 namenode.disallowSnapshot(snapshotRoot); 2713 } catch (RemoteException re) { 2714 throw re.unwrapRemoteException(); 2715 } finally { 2716 scope.close(); 2717 } 2718 } 2719 2720 /** 2721 * Get the difference between two snapshots, or between a snapshot and the 2722 * current tree of a directory. 2723 * @see ClientProtocol#getSnapshotDiffReport(String, String, String) 2724 */ getSnapshotDiffReport(String snapshotDir, String fromSnapshot, String toSnapshot)2725 public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir, 2726 String fromSnapshot, String toSnapshot) throws IOException { 2727 checkOpen(); 2728 TraceScope scope = Trace.startSpan("getSnapshotDiffReport", traceSampler); 2729 try { 2730 return namenode.getSnapshotDiffReport(snapshotDir, 2731 fromSnapshot, toSnapshot); 2732 } catch(RemoteException re) { 2733 throw re.unwrapRemoteException(); 2734 } finally { 2735 scope.close(); 2736 } 2737 } 2738 addCacheDirective( CacheDirectiveInfo info, EnumSet<CacheFlag> flags)2739 public long addCacheDirective( 2740 CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException { 2741 checkOpen(); 2742 TraceScope scope = Trace.startSpan("addCacheDirective", traceSampler); 2743 try { 2744 return namenode.addCacheDirective(info, flags); 2745 } catch (RemoteException re) { 2746 throw re.unwrapRemoteException(); 2747 } finally { 2748 scope.close(); 2749 } 2750 } 2751 modifyCacheDirective( CacheDirectiveInfo info, EnumSet<CacheFlag> flags)2752 public void modifyCacheDirective( 2753 CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException { 2754 checkOpen(); 2755 TraceScope scope = Trace.startSpan("modifyCacheDirective", traceSampler); 2756 try { 2757 namenode.modifyCacheDirective(info, flags); 2758 } catch (RemoteException re) { 2759 throw re.unwrapRemoteException(); 2760 } finally { 2761 scope.close(); 2762 } 2763 } 2764 removeCacheDirective(long id)2765 public void removeCacheDirective(long id) 2766 throws IOException { 2767 checkOpen(); 2768 TraceScope scope = Trace.startSpan("removeCacheDirective", traceSampler); 2769 try { 2770 namenode.removeCacheDirective(id); 2771 } catch (RemoteException re) { 2772 throw re.unwrapRemoteException(); 2773 } finally { 2774 scope.close(); 2775 } 2776 } 2777 listCacheDirectives( CacheDirectiveInfo filter)2778 public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( 2779 CacheDirectiveInfo filter) throws IOException { 2780 return new CacheDirectiveIterator(namenode, filter, traceSampler); 2781 } 2782 addCachePool(CachePoolInfo info)2783 public void addCachePool(CachePoolInfo info) throws IOException { 2784 checkOpen(); 2785 TraceScope scope = Trace.startSpan("addCachePool", traceSampler); 2786 try { 2787 namenode.addCachePool(info); 2788 } catch (RemoteException re) { 2789 throw re.unwrapRemoteException(); 2790 } finally { 2791 scope.close(); 2792 } 2793 } 2794 modifyCachePool(CachePoolInfo info)2795 public void modifyCachePool(CachePoolInfo info) throws IOException { 2796 checkOpen(); 2797 TraceScope scope = Trace.startSpan("modifyCachePool", traceSampler); 2798 try { 2799 namenode.modifyCachePool(info); 2800 } catch (RemoteException re) { 2801 throw re.unwrapRemoteException(); 2802 } finally { 2803 scope.close(); 2804 } 2805 } 2806 removeCachePool(String poolName)2807 public void removeCachePool(String poolName) throws IOException { 2808 checkOpen(); 2809 TraceScope scope = Trace.startSpan("removeCachePool", traceSampler); 2810 try { 2811 namenode.removeCachePool(poolName); 2812 } catch (RemoteException re) { 2813 throw re.unwrapRemoteException(); 2814 } finally { 2815 scope.close(); 2816 } 2817 } 2818 listCachePools()2819 public RemoteIterator<CachePoolEntry> listCachePools() throws IOException { 2820 return new CachePoolIterator(namenode, traceSampler); 2821 } 2822 2823 /** 2824 * Save namespace image. 2825 * 2826 * @see ClientProtocol#saveNamespace() 2827 */ saveNamespace()2828 void saveNamespace() throws AccessControlException, IOException { 2829 TraceScope scope = Trace.startSpan("saveNamespace", traceSampler); 2830 try { 2831 namenode.saveNamespace(); 2832 } catch(RemoteException re) { 2833 throw re.unwrapRemoteException(AccessControlException.class); 2834 } finally { 2835 scope.close(); 2836 } 2837 } 2838 2839 /** 2840 * Rolls the edit log on the active NameNode. 2841 * @return the txid of the new log segment 2842 * 2843 * @see ClientProtocol#rollEdits() 2844 */ rollEdits()2845 long rollEdits() throws AccessControlException, IOException { 2846 TraceScope scope = Trace.startSpan("rollEdits", traceSampler); 2847 try { 2848 return namenode.rollEdits(); 2849 } catch(RemoteException re) { 2850 throw re.unwrapRemoteException(AccessControlException.class); 2851 } finally { 2852 scope.close(); 2853 } 2854 } 2855 2856 @VisibleForTesting getPreviousBlock(long fileId)2857 ExtendedBlock getPreviousBlock(long fileId) { 2858 return filesBeingWritten.get(fileId).getBlock(); 2859 } 2860 2861 /** 2862 * enable/disable restore failed storage. 2863 * 2864 * @see ClientProtocol#restoreFailedStorage(String arg) 2865 */ restoreFailedStorage(String arg)2866 boolean restoreFailedStorage(String arg) 2867 throws AccessControlException, IOException{ 2868 TraceScope scope = Trace.startSpan("restoreFailedStorage", traceSampler); 2869 try { 2870 return namenode.restoreFailedStorage(arg); 2871 } finally { 2872 scope.close(); 2873 } 2874 } 2875 2876 /** 2877 * Refresh the hosts and exclude files. (Rereads them.) 2878 * See {@link ClientProtocol#refreshNodes()} 2879 * for more details. 2880 * 2881 * @see ClientProtocol#refreshNodes() 2882 */ refreshNodes()2883 public void refreshNodes() throws IOException { 2884 TraceScope scope = Trace.startSpan("refreshNodes", traceSampler); 2885 try { 2886 namenode.refreshNodes(); 2887 } finally { 2888 scope.close(); 2889 } 2890 } 2891 2892 /** 2893 * Dumps DFS data structures into specified file. 2894 * 2895 * @see ClientProtocol#metaSave(String) 2896 */ metaSave(String pathname)2897 public void metaSave(String pathname) throws IOException { 2898 TraceScope scope = Trace.startSpan("metaSave", traceSampler); 2899 try { 2900 namenode.metaSave(pathname); 2901 } finally { 2902 scope.close(); 2903 } 2904 } 2905 2906 /** 2907 * Requests the namenode to tell all datanodes to use a new, non-persistent 2908 * bandwidth value for dfs.balance.bandwidthPerSec. 2909 * See {@link ClientProtocol#setBalancerBandwidth(long)} 2910 * for more details. 2911 * 2912 * @see ClientProtocol#setBalancerBandwidth(long) 2913 */ setBalancerBandwidth(long bandwidth)2914 public void setBalancerBandwidth(long bandwidth) throws IOException { 2915 TraceScope scope = Trace.startSpan("setBalancerBandwidth", traceSampler); 2916 try { 2917 namenode.setBalancerBandwidth(bandwidth); 2918 } finally { 2919 scope.close(); 2920 } 2921 } 2922 2923 /** 2924 * @see ClientProtocol#finalizeUpgrade() 2925 */ finalizeUpgrade()2926 public void finalizeUpgrade() throws IOException { 2927 TraceScope scope = Trace.startSpan("finalizeUpgrade", traceSampler); 2928 try { 2929 namenode.finalizeUpgrade(); 2930 } finally { 2931 scope.close(); 2932 } 2933 } 2934 rollingUpgrade(RollingUpgradeAction action)2935 RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException { 2936 TraceScope scope = Trace.startSpan("rollingUpgrade", traceSampler); 2937 try { 2938 return namenode.rollingUpgrade(action); 2939 } finally { 2940 scope.close(); 2941 } 2942 } 2943 2944 /** 2945 */ 2946 @Deprecated mkdirs(String src)2947 public boolean mkdirs(String src) throws IOException { 2948 return mkdirs(src, null, true); 2949 } 2950 2951 /** 2952 * Create a directory (or hierarchy of directories) with the given 2953 * name and permission. 2954 * 2955 * @param src The path of the directory being created 2956 * @param permission The permission of the directory being created. 2957 * If permission == null, use {@link FsPermission#getDefault()}. 2958 * @param createParent create missing parent directory if true 2959 * 2960 * @return True if the operation success. 2961 * 2962 * @see ClientProtocol#mkdirs(String, FsPermission, boolean) 2963 */ mkdirs(String src, FsPermission permission, boolean createParent)2964 public boolean mkdirs(String src, FsPermission permission, 2965 boolean createParent) throws IOException { 2966 if (permission == null) { 2967 permission = FsPermission.getDefault(); 2968 } 2969 FsPermission masked = permission.applyUMask(dfsClientConf.uMask); 2970 return primitiveMkdir(src, masked, createParent); 2971 } 2972 2973 /** 2974 * Same {{@link #mkdirs(String, FsPermission, boolean)} except 2975 * that the permissions has already been masked against umask. 2976 */ primitiveMkdir(String src, FsPermission absPermission)2977 public boolean primitiveMkdir(String src, FsPermission absPermission) 2978 throws IOException { 2979 return primitiveMkdir(src, absPermission, true); 2980 } 2981 2982 /** 2983 * Same {{@link #mkdirs(String, FsPermission, boolean)} except 2984 * that the permissions has already been masked against umask. 2985 */ primitiveMkdir(String src, FsPermission absPermission, boolean createParent)2986 public boolean primitiveMkdir(String src, FsPermission absPermission, 2987 boolean createParent) 2988 throws IOException { 2989 checkOpen(); 2990 if (absPermission == null) { 2991 absPermission = 2992 FsPermission.getDefault().applyUMask(dfsClientConf.uMask); 2993 } 2994 2995 if(LOG.isDebugEnabled()) { 2996 LOG.debug(src + ": masked=" + absPermission); 2997 } 2998 TraceScope scope = Trace.startSpan("mkdir", traceSampler); 2999 try { 3000 return namenode.mkdirs(src, absPermission, createParent); 3001 } catch(RemoteException re) { 3002 throw re.unwrapRemoteException(AccessControlException.class, 3003 InvalidPathException.class, 3004 FileAlreadyExistsException.class, 3005 FileNotFoundException.class, 3006 ParentNotDirectoryException.class, 3007 SafeModeException.class, 3008 NSQuotaExceededException.class, 3009 DSQuotaExceededException.class, 3010 UnresolvedPathException.class, 3011 SnapshotAccessControlException.class); 3012 } finally { 3013 scope.close(); 3014 } 3015 } 3016 3017 /** 3018 * Get {@link ContentSummary} rooted at the specified directory. 3019 * @param src The string representation of the path 3020 * 3021 * @see ClientProtocol#getContentSummary(String) 3022 */ getContentSummary(String src)3023 ContentSummary getContentSummary(String src) throws IOException { 3024 TraceScope scope = getPathTraceScope("getContentSummary", src); 3025 try { 3026 return namenode.getContentSummary(src); 3027 } catch(RemoteException re) { 3028 throw re.unwrapRemoteException(AccessControlException.class, 3029 FileNotFoundException.class, 3030 UnresolvedPathException.class); 3031 } finally { 3032 scope.close(); 3033 } 3034 } 3035 3036 /** 3037 * Sets or resets quotas for a directory. 3038 * @see ClientProtocol#setQuota(String, long, long, StorageType) 3039 */ setQuota(String src, long namespaceQuota, long storagespaceQuota)3040 void setQuota(String src, long namespaceQuota, long storagespaceQuota) 3041 throws IOException { 3042 // sanity check 3043 if ((namespaceQuota <= 0 && namespaceQuota != HdfsConstants.QUOTA_DONT_SET && 3044 namespaceQuota != HdfsConstants.QUOTA_RESET) || 3045 (storagespaceQuota <= 0 && storagespaceQuota != HdfsConstants.QUOTA_DONT_SET && 3046 storagespaceQuota != HdfsConstants.QUOTA_RESET)) { 3047 throw new IllegalArgumentException("Invalid values for quota : " + 3048 namespaceQuota + " and " + 3049 storagespaceQuota); 3050 3051 } 3052 TraceScope scope = getPathTraceScope("setQuota", src); 3053 try { 3054 // Pass null as storage type for traditional namespace/storagespace quota. 3055 namenode.setQuota(src, namespaceQuota, storagespaceQuota, null); 3056 } catch(RemoteException re) { 3057 throw re.unwrapRemoteException(AccessControlException.class, 3058 FileNotFoundException.class, 3059 NSQuotaExceededException.class, 3060 DSQuotaExceededException.class, 3061 UnresolvedPathException.class, 3062 SnapshotAccessControlException.class); 3063 } finally { 3064 scope.close(); 3065 } 3066 } 3067 3068 /** 3069 * Sets or resets quotas by storage type for a directory. 3070 * @see ClientProtocol#setQuota(String, long, long, StorageType) 3071 */ setQuotaByStorageType(String src, StorageType type, long quota)3072 void setQuotaByStorageType(String src, StorageType type, long quota) 3073 throws IOException { 3074 if (quota <= 0 && quota != HdfsConstants.QUOTA_DONT_SET && 3075 quota != HdfsConstants.QUOTA_RESET) { 3076 throw new IllegalArgumentException("Invalid values for quota :" + 3077 quota); 3078 } 3079 if (type == null) { 3080 throw new IllegalArgumentException("Invalid storage type(null)"); 3081 } 3082 if (!type.supportTypeQuota()) { 3083 throw new IllegalArgumentException("Don't support Quota for storage type : " 3084 + type.toString()); 3085 } 3086 TraceScope scope = getPathTraceScope("setQuotaByStorageType", src); 3087 try { 3088 namenode.setQuota(src, HdfsConstants.QUOTA_DONT_SET, quota, type); 3089 } catch (RemoteException re) { 3090 throw re.unwrapRemoteException(AccessControlException.class, 3091 FileNotFoundException.class, 3092 QuotaByStorageTypeExceededException.class, 3093 UnresolvedPathException.class, 3094 SnapshotAccessControlException.class); 3095 } finally { 3096 scope.close(); 3097 } 3098 } 3099 /** 3100 * set the modification and access time of a file 3101 * 3102 * @see ClientProtocol#setTimes(String, long, long) 3103 */ setTimes(String src, long mtime, long atime)3104 public void setTimes(String src, long mtime, long atime) throws IOException { 3105 checkOpen(); 3106 TraceScope scope = getPathTraceScope("setTimes", src); 3107 try { 3108 namenode.setTimes(src, mtime, atime); 3109 } catch(RemoteException re) { 3110 throw re.unwrapRemoteException(AccessControlException.class, 3111 FileNotFoundException.class, 3112 UnresolvedPathException.class, 3113 SnapshotAccessControlException.class); 3114 } finally { 3115 scope.close(); 3116 } 3117 } 3118 3119 /** 3120 * @deprecated use {@link HdfsDataInputStream} instead. 3121 */ 3122 @Deprecated 3123 public static class DFSDataInputStream extends HdfsDataInputStream { 3124 DFSDataInputStream(DFSInputStream in)3125 public DFSDataInputStream(DFSInputStream in) throws IOException { 3126 super(in); 3127 } 3128 } 3129 reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn)3130 void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) { 3131 DatanodeInfo [] dnArr = { dn }; 3132 LocatedBlock [] lblocks = { new LocatedBlock(blk, dnArr) }; 3133 reportChecksumFailure(file, lblocks); 3134 } 3135 3136 // just reports checksum failure and ignores any exception during the report. reportChecksumFailure(String file, LocatedBlock lblocks[])3137 void reportChecksumFailure(String file, LocatedBlock lblocks[]) { 3138 try { 3139 reportBadBlocks(lblocks); 3140 } catch (IOException ie) { 3141 LOG.info("Found corruption while reading " + file 3142 + ". Error repairing corrupt blocks. Bad blocks remain.", ie); 3143 } 3144 } 3145 3146 @Override toString()3147 public String toString() { 3148 return getClass().getSimpleName() + "[clientName=" + clientName 3149 + ", ugi=" + ugi + "]"; 3150 } 3151 getDefaultReadCachingStrategy()3152 public CachingStrategy getDefaultReadCachingStrategy() { 3153 return defaultReadCachingStrategy; 3154 } 3155 getDefaultWriteCachingStrategy()3156 public CachingStrategy getDefaultWriteCachingStrategy() { 3157 return defaultWriteCachingStrategy; 3158 } 3159 getClientContext()3160 public ClientContext getClientContext() { 3161 return clientContext; 3162 } 3163 modifyAclEntries(String src, List<AclEntry> aclSpec)3164 public void modifyAclEntries(String src, List<AclEntry> aclSpec) 3165 throws IOException { 3166 checkOpen(); 3167 TraceScope scope = getPathTraceScope("modifyAclEntries", src); 3168 try { 3169 namenode.modifyAclEntries(src, aclSpec); 3170 } catch(RemoteException re) { 3171 throw re.unwrapRemoteException(AccessControlException.class, 3172 AclException.class, 3173 FileNotFoundException.class, 3174 NSQuotaExceededException.class, 3175 SafeModeException.class, 3176 SnapshotAccessControlException.class, 3177 UnresolvedPathException.class); 3178 } finally { 3179 scope.close(); 3180 } 3181 } 3182 removeAclEntries(String src, List<AclEntry> aclSpec)3183 public void removeAclEntries(String src, List<AclEntry> aclSpec) 3184 throws IOException { 3185 checkOpen(); 3186 TraceScope scope = Trace.startSpan("removeAclEntries", traceSampler); 3187 try { 3188 namenode.removeAclEntries(src, aclSpec); 3189 } catch(RemoteException re) { 3190 throw re.unwrapRemoteException(AccessControlException.class, 3191 AclException.class, 3192 FileNotFoundException.class, 3193 NSQuotaExceededException.class, 3194 SafeModeException.class, 3195 SnapshotAccessControlException.class, 3196 UnresolvedPathException.class); 3197 } finally { 3198 scope.close(); 3199 } 3200 } 3201 removeDefaultAcl(String src)3202 public void removeDefaultAcl(String src) throws IOException { 3203 checkOpen(); 3204 TraceScope scope = Trace.startSpan("removeDefaultAcl", traceSampler); 3205 try { 3206 namenode.removeDefaultAcl(src); 3207 } catch(RemoteException re) { 3208 throw re.unwrapRemoteException(AccessControlException.class, 3209 AclException.class, 3210 FileNotFoundException.class, 3211 NSQuotaExceededException.class, 3212 SafeModeException.class, 3213 SnapshotAccessControlException.class, 3214 UnresolvedPathException.class); 3215 } finally { 3216 scope.close(); 3217 } 3218 } 3219 removeAcl(String src)3220 public void removeAcl(String src) throws IOException { 3221 checkOpen(); 3222 TraceScope scope = Trace.startSpan("removeAcl", traceSampler); 3223 try { 3224 namenode.removeAcl(src); 3225 } catch(RemoteException re) { 3226 throw re.unwrapRemoteException(AccessControlException.class, 3227 AclException.class, 3228 FileNotFoundException.class, 3229 NSQuotaExceededException.class, 3230 SafeModeException.class, 3231 SnapshotAccessControlException.class, 3232 UnresolvedPathException.class); 3233 } finally { 3234 scope.close(); 3235 } 3236 } 3237 setAcl(String src, List<AclEntry> aclSpec)3238 public void setAcl(String src, List<AclEntry> aclSpec) throws IOException { 3239 checkOpen(); 3240 TraceScope scope = Trace.startSpan("setAcl", traceSampler); 3241 try { 3242 namenode.setAcl(src, aclSpec); 3243 } catch(RemoteException re) { 3244 throw re.unwrapRemoteException(AccessControlException.class, 3245 AclException.class, 3246 FileNotFoundException.class, 3247 NSQuotaExceededException.class, 3248 SafeModeException.class, 3249 SnapshotAccessControlException.class, 3250 UnresolvedPathException.class); 3251 } finally { 3252 scope.close(); 3253 } 3254 } 3255 getAclStatus(String src)3256 public AclStatus getAclStatus(String src) throws IOException { 3257 checkOpen(); 3258 TraceScope scope = getPathTraceScope("getAclStatus", src); 3259 try { 3260 return namenode.getAclStatus(src); 3261 } catch(RemoteException re) { 3262 throw re.unwrapRemoteException(AccessControlException.class, 3263 AclException.class, 3264 FileNotFoundException.class, 3265 UnresolvedPathException.class); 3266 } finally { 3267 scope.close(); 3268 } 3269 } 3270 createEncryptionZone(String src, String keyName)3271 public void createEncryptionZone(String src, String keyName) 3272 throws IOException { 3273 checkOpen(); 3274 TraceScope scope = getPathTraceScope("createEncryptionZone", src); 3275 try { 3276 namenode.createEncryptionZone(src, keyName); 3277 } catch (RemoteException re) { 3278 throw re.unwrapRemoteException(AccessControlException.class, 3279 SafeModeException.class, 3280 UnresolvedPathException.class); 3281 } finally { 3282 scope.close(); 3283 } 3284 } 3285 getEZForPath(String src)3286 public EncryptionZone getEZForPath(String src) 3287 throws IOException { 3288 checkOpen(); 3289 TraceScope scope = getPathTraceScope("getEZForPath", src); 3290 try { 3291 return namenode.getEZForPath(src); 3292 } catch (RemoteException re) { 3293 throw re.unwrapRemoteException(AccessControlException.class, 3294 UnresolvedPathException.class); 3295 } finally { 3296 scope.close(); 3297 } 3298 } 3299 listEncryptionZones()3300 public RemoteIterator<EncryptionZone> listEncryptionZones() 3301 throws IOException { 3302 checkOpen(); 3303 return new EncryptionZoneIterator(namenode, traceSampler); 3304 } 3305 setXAttr(String src, String name, byte[] value, EnumSet<XAttrSetFlag> flag)3306 public void setXAttr(String src, String name, byte[] value, 3307 EnumSet<XAttrSetFlag> flag) throws IOException { 3308 checkOpen(); 3309 TraceScope scope = getPathTraceScope("setXAttr", src); 3310 try { 3311 namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag); 3312 } catch (RemoteException re) { 3313 throw re.unwrapRemoteException(AccessControlException.class, 3314 FileNotFoundException.class, 3315 NSQuotaExceededException.class, 3316 SafeModeException.class, 3317 SnapshotAccessControlException.class, 3318 UnresolvedPathException.class); 3319 } finally { 3320 scope.close(); 3321 } 3322 } 3323 getXAttr(String src, String name)3324 public byte[] getXAttr(String src, String name) throws IOException { 3325 checkOpen(); 3326 TraceScope scope = getPathTraceScope("getXAttr", src); 3327 try { 3328 final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name); 3329 final List<XAttr> result = namenode.getXAttrs(src, xAttrs); 3330 return XAttrHelper.getFirstXAttrValue(result); 3331 } catch(RemoteException re) { 3332 throw re.unwrapRemoteException(AccessControlException.class, 3333 FileNotFoundException.class, 3334 UnresolvedPathException.class); 3335 } finally { 3336 scope.close(); 3337 } 3338 } 3339 getXAttrs(String src)3340 public Map<String, byte[]> getXAttrs(String src) throws IOException { 3341 checkOpen(); 3342 TraceScope scope = getPathTraceScope("getXAttrs", src); 3343 try { 3344 return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null)); 3345 } catch(RemoteException re) { 3346 throw re.unwrapRemoteException(AccessControlException.class, 3347 FileNotFoundException.class, 3348 UnresolvedPathException.class); 3349 } finally { 3350 scope.close(); 3351 } 3352 } 3353 getXAttrs(String src, List<String> names)3354 public Map<String, byte[]> getXAttrs(String src, List<String> names) 3355 throws IOException { 3356 checkOpen(); 3357 TraceScope scope = getPathTraceScope("getXAttrs", src); 3358 try { 3359 return XAttrHelper.buildXAttrMap(namenode.getXAttrs( 3360 src, XAttrHelper.buildXAttrs(names))); 3361 } catch(RemoteException re) { 3362 throw re.unwrapRemoteException(AccessControlException.class, 3363 FileNotFoundException.class, 3364 UnresolvedPathException.class); 3365 } finally { 3366 scope.close(); 3367 } 3368 } 3369 listXAttrs(String src)3370 public List<String> listXAttrs(String src) 3371 throws IOException { 3372 checkOpen(); 3373 TraceScope scope = getPathTraceScope("listXAttrs", src); 3374 try { 3375 final Map<String, byte[]> xattrs = 3376 XAttrHelper.buildXAttrMap(namenode.listXAttrs(src)); 3377 return Lists.newArrayList(xattrs.keySet()); 3378 } catch(RemoteException re) { 3379 throw re.unwrapRemoteException(AccessControlException.class, 3380 FileNotFoundException.class, 3381 UnresolvedPathException.class); 3382 } finally { 3383 scope.close(); 3384 } 3385 } 3386 removeXAttr(String src, String name)3387 public void removeXAttr(String src, String name) throws IOException { 3388 checkOpen(); 3389 TraceScope scope = getPathTraceScope("removeXAttr", src); 3390 try { 3391 namenode.removeXAttr(src, XAttrHelper.buildXAttr(name)); 3392 } catch(RemoteException re) { 3393 throw re.unwrapRemoteException(AccessControlException.class, 3394 FileNotFoundException.class, 3395 NSQuotaExceededException.class, 3396 SafeModeException.class, 3397 SnapshotAccessControlException.class, 3398 UnresolvedPathException.class); 3399 } finally { 3400 scope.close(); 3401 } 3402 } 3403 checkAccess(String src, FsAction mode)3404 public void checkAccess(String src, FsAction mode) throws IOException { 3405 checkOpen(); 3406 TraceScope scope = getPathTraceScope("checkAccess", src); 3407 try { 3408 namenode.checkAccess(src, mode); 3409 } catch (RemoteException re) { 3410 throw re.unwrapRemoteException(AccessControlException.class, 3411 FileNotFoundException.class, 3412 UnresolvedPathException.class); 3413 } finally { 3414 scope.close(); 3415 } 3416 } 3417 getInotifyEventStream()3418 public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { 3419 return new DFSInotifyEventInputStream(traceSampler, namenode); 3420 } 3421 getInotifyEventStream(long lastReadTxid)3422 public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) 3423 throws IOException { 3424 return new DFSInotifyEventInputStream(traceSampler, namenode, lastReadTxid); 3425 } 3426 3427 @Override // RemotePeerFactory newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)3428 public Peer newConnectedPeer(InetSocketAddress addr, 3429 Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) 3430 throws IOException { 3431 Peer peer = null; 3432 boolean success = false; 3433 Socket sock = null; 3434 try { 3435 sock = socketFactory.createSocket(); 3436 NetUtils.connect(sock, addr, 3437 getRandomLocalInterfaceAddr(), 3438 dfsClientConf.socketTimeout); 3439 peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, 3440 blockToken, datanodeId); 3441 peer.setReadTimeout(dfsClientConf.socketTimeout); 3442 success = true; 3443 return peer; 3444 } finally { 3445 if (!success) { 3446 IOUtils.cleanup(LOG, peer); 3447 IOUtils.closeSocket(sock); 3448 } 3449 } 3450 } 3451 3452 /** 3453 * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if 3454 * it does not already exist. 3455 * @param num Number of threads for hedged reads thread pool. 3456 * If zero, skip hedged reads thread pool creation. 3457 */ initThreadsNumForHedgedReads(int num)3458 private synchronized void initThreadsNumForHedgedReads(int num) { 3459 if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return; 3460 HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, 3461 TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 3462 new Daemon.DaemonFactory() { 3463 private final AtomicInteger threadIndex = 3464 new AtomicInteger(0); 3465 @Override 3466 public Thread newThread(Runnable r) { 3467 Thread t = super.newThread(r); 3468 t.setName("hedgedRead-" + 3469 threadIndex.getAndIncrement()); 3470 return t; 3471 } 3472 }, 3473 new ThreadPoolExecutor.CallerRunsPolicy() { 3474 3475 @Override 3476 public void rejectedExecution(Runnable runnable, 3477 ThreadPoolExecutor e) { 3478 LOG.info("Execution rejected, Executing in current thread"); 3479 HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(); 3480 // will run in the current thread 3481 super.rejectedExecution(runnable, e); 3482 } 3483 }); 3484 HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); 3485 if (LOG.isDebugEnabled()) { 3486 LOG.debug("Using hedged reads; pool threads=" + num); 3487 } 3488 } 3489 getHedgedReadTimeout()3490 long getHedgedReadTimeout() { 3491 return this.hedgedReadThresholdMillis; 3492 } 3493 3494 @VisibleForTesting setHedgedReadTimeout(long timeoutMillis)3495 void setHedgedReadTimeout(long timeoutMillis) { 3496 this.hedgedReadThresholdMillis = timeoutMillis; 3497 } 3498 getHedgedReadsThreadPool()3499 ThreadPoolExecutor getHedgedReadsThreadPool() { 3500 return HEDGED_READ_THREAD_POOL; 3501 } 3502 isHedgedReadsEnabled()3503 boolean isHedgedReadsEnabled() { 3504 return (HEDGED_READ_THREAD_POOL != null) && 3505 HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0; 3506 } 3507 getHedgedReadMetrics()3508 DFSHedgedReadMetrics getHedgedReadMetrics() { 3509 return HEDGED_READ_METRIC; 3510 } 3511 getKeyProvider()3512 public KeyProvider getKeyProvider() { 3513 return clientContext.getKeyProviderCache().get(conf); 3514 } 3515 3516 @VisibleForTesting setKeyProvider(KeyProvider provider)3517 public void setKeyProvider(KeyProvider provider) { 3518 try { 3519 clientContext.getKeyProviderCache().setKeyProvider(conf, provider); 3520 } catch (IOException e) { 3521 LOG.error("Could not set KeyProvider !!", e); 3522 } 3523 } 3524 3525 /** 3526 * Probe for encryption enabled on this filesystem. 3527 * See {@link DFSUtil#isHDFSEncryptionEnabled(Configuration)} 3528 * @return true if encryption is enabled 3529 */ isHDFSEncryptionEnabled()3530 public boolean isHDFSEncryptionEnabled() { 3531 return DFSUtil.isHDFSEncryptionEnabled(this.conf); 3532 } 3533 3534 /** 3535 * Returns the SaslDataTransferClient configured for this DFSClient. 3536 * 3537 * @return SaslDataTransferClient configured for this DFSClient 3538 */ getSaslDataTransferClient()3539 public SaslDataTransferClient getSaslDataTransferClient() { 3540 return saslClient; 3541 } 3542 3543 private static final byte[] PATH = "path".getBytes(Charset.forName("UTF-8")); 3544 getPathTraceScope(String description, String path)3545 TraceScope getPathTraceScope(String description, String path) { 3546 TraceScope scope = Trace.startSpan(description, traceSampler); 3547 Span span = scope.getSpan(); 3548 if (span != null) { 3549 if (path != null) { 3550 span.addKVAnnotation(PATH, 3551 path.getBytes(Charset.forName("UTF-8"))); 3552 } 3553 } 3554 return scope; 3555 } 3556 3557 private static final byte[] SRC = "src".getBytes(Charset.forName("UTF-8")); 3558 3559 private static final byte[] DST = "dst".getBytes(Charset.forName("UTF-8")); 3560 getSrcDstTraceScope(String description, String src, String dst)3561 TraceScope getSrcDstTraceScope(String description, String src, String dst) { 3562 TraceScope scope = Trace.startSpan(description, traceSampler); 3563 Span span = scope.getSpan(); 3564 if (span != null) { 3565 if (src != null) { 3566 span.addKVAnnotation(SRC, 3567 src.getBytes(Charset.forName("UTF-8"))); 3568 } 3569 if (dst != null) { 3570 span.addKVAnnotation(DST, 3571 dst.getBytes(Charset.forName("UTF-8"))); 3572 } 3573 } 3574 return scope; 3575 } 3576 } 3577