1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hdfs; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.Charsets; 23 import com.google.common.base.Joiner; 24 import com.google.common.base.Preconditions; 25 import com.google.common.base.Supplier; 26 import com.google.common.collect.Lists; 27 import com.google.common.collect.Maps; 28 29 import org.apache.commons.io.FileUtils; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.conf.Configuration; 33 import org.apache.hadoop.crypto.key.KeyProvider; 34 import org.apache.hadoop.fs.BlockLocation; 35 import org.apache.hadoop.fs.CacheFlag; 36 import org.apache.hadoop.fs.CommonConfigurationKeys; 37 import org.apache.hadoop.fs.CreateFlag; 38 import org.apache.hadoop.fs.FileContext; 39 import org.apache.hadoop.fs.FileSystem; 40 import org.apache.hadoop.fs.FileSystem.Statistics; 41 import org.apache.hadoop.fs.FSDataInputStream; 42 import org.apache.hadoop.fs.FSDataOutputStream; 43 import org.apache.hadoop.fs.FsShell; 44 import org.apache.hadoop.fs.Options.Rename; 45 import org.apache.hadoop.fs.Path; 46 import org.apache.hadoop.fs.permission.AclEntry; 47 import org.apache.hadoop.fs.permission.AclEntryScope; 48 import org.apache.hadoop.fs.permission.AclEntryType; 49 import org.apache.hadoop.fs.permission.FsAction; 50 import org.apache.hadoop.fs.permission.FsPermission; 51 import org.apache.hadoop.fs.StorageType; 52 import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo; 53 import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 54 import org.apache.hadoop.hdfs.protocol.*; 55 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; 56 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 57 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 58 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 59 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; 60 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; 61 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; 62 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; 63 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; 64 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; 65 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; 66 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; 67 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; 68 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; 69 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; 70 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; 71 import org.apache.hadoop.hdfs.server.common.StorageInfo; 72 import org.apache.hadoop.hdfs.server.datanode.DataNode; 73 import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion; 74 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; 75 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 76 import org.apache.hadoop.hdfs.server.namenode.FSEditLog; 77 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 78 import org.apache.hadoop.hdfs.server.namenode.INodeFile; 79 import org.apache.hadoop.hdfs.server.namenode.LeaseManager; 80 import org.apache.hadoop.hdfs.server.namenode.NameNode; 81 import org.apache.hadoop.hdfs.server.namenode.ha 82 .ConfiguredFailoverProxyProvider; 83 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; 84 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 85 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; 86 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; 87 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; 88 import org.apache.hadoop.hdfs.tools.DFSAdmin; 89 import org.apache.hadoop.io.IOUtils; 90 import org.apache.hadoop.io.nativeio.NativeIO; 91 import org.apache.hadoop.net.NetUtils; 92 import org.apache.hadoop.net.unix.DomainSocket; 93 import org.apache.hadoop.net.unix.TemporarySocketDirectory; 94 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; 95 import org.apache.hadoop.security.UserGroupInformation; 96 import org.apache.hadoop.security.token.Token; 97 import org.apache.hadoop.test.GenericTestUtils; 98 import org.apache.hadoop.util.StringUtils; 99 import org.apache.hadoop.util.Time; 100 import org.apache.hadoop.util.Tool; 101 import org.apache.hadoop.util.VersionInfo; 102 import org.apache.log4j.Level; 103 import org.junit.Assume; 104 import org.mockito.internal.util.reflection.Whitebox; 105 106 import java.io.*; 107 import java.lang.reflect.Field; 108 import java.lang.reflect.Modifier; 109 import java.net.*; 110 import java.nio.ByteBuffer; 111 import java.security.NoSuchAlgorithmException; 112 import java.security.PrivilegedExceptionAction; 113 import java.util.*; 114 import java.util.concurrent.TimeoutException; 115 116 import static org.apache.hadoop.hdfs.DFSConfigKeys.*; 117 import static org.apache.hadoop.fs.CreateFlag.CREATE; 118 import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; 119 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; 120 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; 121 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; 122 import static org.junit.Assert.assertEquals; 123 import static org.junit.Assert.assertTrue; 124 import static org.junit.Assert.fail; 125 126 /** Utilities for HDFS tests */ 127 public class DFSTestUtil { 128 129 private static final Log LOG = LogFactory.getLog(DFSTestUtil.class); 130 131 private static final Random gen = new Random(); 132 private static final String[] dirNames = { 133 "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" 134 }; 135 136 private final int maxLevels; 137 private final int maxSize; 138 private final int minSize; 139 private final int nFiles; 140 private MyFile[] files; 141 142 /** Creates a new instance of DFSTestUtil 143 * 144 * @param nFiles Number of files to be created 145 * @param maxLevels Maximum number of directory levels 146 * @param maxSize Maximum size for file 147 * @param minSize Minimum size for file 148 */ DFSTestUtil(int nFiles, int maxLevels, int maxSize, int minSize)149 private DFSTestUtil(int nFiles, int maxLevels, int maxSize, int minSize) { 150 this.nFiles = nFiles; 151 this.maxLevels = maxLevels; 152 this.maxSize = maxSize; 153 this.minSize = minSize; 154 } 155 156 /** Creates a new instance of DFSTestUtil 157 * 158 * @param testName Name of the test from where this utility is used 159 * @param nFiles Number of files to be created 160 * @param maxLevels Maximum number of directory levels 161 * @param maxSize Maximum size for file 162 * @param minSize Minimum size for file 163 */ DFSTestUtil(String testName, int nFiles, int maxLevels, int maxSize, int minSize)164 public DFSTestUtil(String testName, int nFiles, int maxLevels, int maxSize, 165 int minSize) { 166 this.nFiles = nFiles; 167 this.maxLevels = maxLevels; 168 this.maxSize = maxSize; 169 this.minSize = minSize; 170 } 171 172 /** 173 * when formatting a namenode - we must provide clusterid. 174 * @param conf 175 * @throws IOException 176 */ formatNameNode(Configuration conf)177 public static void formatNameNode(Configuration conf) throws IOException { 178 String clusterId = StartupOption.FORMAT.getClusterId(); 179 if(clusterId == null || clusterId.isEmpty()) 180 StartupOption.FORMAT.setClusterId("testClusterID"); 181 // Use a copy of conf as it can be altered by namenode during format. 182 NameNode.format(new Configuration(conf)); 183 } 184 185 /** 186 * Create a new HA-enabled configuration. 187 */ newHAConfiguration(final String logicalName)188 public static Configuration newHAConfiguration(final String logicalName) { 189 Configuration conf = new Configuration(); 190 addHAConfiguration(conf, logicalName); 191 return conf; 192 } 193 194 /** 195 * Add a new HA configuration. 196 */ addHAConfiguration(Configuration conf, final String logicalName)197 public static void addHAConfiguration(Configuration conf, 198 final String logicalName) { 199 String nsIds = conf.get(DFSConfigKeys.DFS_NAMESERVICES); 200 if (nsIds == null) { 201 conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); 202 } else { // append the nsid 203 conf.set(DFSConfigKeys.DFS_NAMESERVICES, nsIds + "," + logicalName); 204 } 205 conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, 206 logicalName), "nn1,nn2"); 207 conf.set(DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "" + 208 "." + logicalName, 209 ConfiguredFailoverProxyProvider.class.getName()); 210 conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); 211 } 212 setFakeHttpAddresses(Configuration conf, final String logicalName)213 public static void setFakeHttpAddresses(Configuration conf, 214 final String logicalName) { 215 conf.set(DFSUtil.addKeySuffixes( 216 DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, 217 logicalName, "nn1"), "127.0.0.1:12345"); 218 conf.set(DFSUtil.addKeySuffixes( 219 DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, 220 logicalName, "nn2"), "127.0.0.1:12346"); 221 } 222 setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog)223 public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) { 224 Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog); 225 Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog); 226 } 227 228 /** class MyFile contains enough information to recreate the contents of 229 * a single file. 230 */ 231 private class MyFile { 232 233 private String name = ""; 234 private final int size; 235 private final long seed; 236 MyFile()237 MyFile() { 238 int nLevels = gen.nextInt(maxLevels); 239 if (nLevels != 0) { 240 int[] levels = new int[nLevels]; 241 for (int idx = 0; idx < nLevels; idx++) { 242 levels[idx] = gen.nextInt(10); 243 } 244 StringBuffer sb = new StringBuffer(); 245 for (int idx = 0; idx < nLevels; idx++) { 246 sb.append(dirNames[levels[idx]]); 247 sb.append("/"); 248 } 249 name = sb.toString(); 250 } 251 long fidx = -1; 252 while (fidx < 0) { fidx = gen.nextLong(); } 253 name = name + Long.toString(fidx); 254 size = minSize + gen.nextInt(maxSize - minSize); 255 seed = gen.nextLong(); 256 } 257 getName()258 String getName() { return name; } getSize()259 int getSize() { return size; } getSeed()260 long getSeed() { return seed; } 261 } 262 createFiles(FileSystem fs, String topdir)263 public void createFiles(FileSystem fs, String topdir) throws IOException { 264 createFiles(fs, topdir, (short)3); 265 } 266 267 /** create nFiles with random names and directory hierarchies 268 * with random (but reproducible) data in them. 269 */ createFiles(FileSystem fs, String topdir, short replicationFactor)270 public void createFiles(FileSystem fs, String topdir, 271 short replicationFactor) throws IOException { 272 files = new MyFile[nFiles]; 273 274 for (int idx = 0; idx < nFiles; idx++) { 275 files[idx] = new MyFile(); 276 } 277 278 Path root = new Path(topdir); 279 280 for (int idx = 0; idx < nFiles; idx++) { 281 createFile(fs, new Path(root, files[idx].getName()), files[idx].getSize(), 282 replicationFactor, files[idx].getSeed()); 283 } 284 } 285 readFile(FileSystem fs, Path fileName)286 public static String readFile(FileSystem fs, Path fileName) 287 throws IOException { 288 byte buf[] = readFileBuffer(fs, fileName); 289 return new String(buf, 0, buf.length); 290 } 291 readFileBuffer(FileSystem fs, Path fileName)292 public static byte[] readFileBuffer(FileSystem fs, Path fileName) 293 throws IOException { 294 ByteArrayOutputStream os = new ByteArrayOutputStream(); 295 try { 296 FSDataInputStream in = fs.open(fileName); 297 try { 298 IOUtils.copyBytes(in, os, 1024, true); 299 return os.toByteArray(); 300 } finally { 301 in.close(); 302 } 303 } finally { 304 os.close(); 305 } 306 } 307 createFile(FileSystem fs, Path fileName, long fileLen, short replFactor, long seed)308 public static void createFile(FileSystem fs, Path fileName, long fileLen, 309 short replFactor, long seed) throws IOException { 310 if (!fs.mkdirs(fileName.getParent())) { 311 throw new IOException("Mkdirs failed to create " + 312 fileName.getParent().toString()); 313 } 314 FSDataOutputStream out = null; 315 try { 316 out = fs.create(fileName, replFactor); 317 byte[] toWrite = new byte[1024]; 318 Random rb = new Random(seed); 319 long bytesToWrite = fileLen; 320 while (bytesToWrite>0) { 321 rb.nextBytes(toWrite); 322 int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite; 323 324 out.write(toWrite, 0, bytesToWriteNext); 325 bytesToWrite -= bytesToWriteNext; 326 } 327 out.close(); 328 out = null; 329 } finally { 330 IOUtils.closeStream(out); 331 } 332 } 333 createFile(FileSystem fs, Path fileName, int bufferLen, long fileLen, long blockSize, short replFactor, long seed)334 public static void createFile(FileSystem fs, Path fileName, int bufferLen, 335 long fileLen, long blockSize, short replFactor, long seed) 336 throws IOException { 337 createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor, 338 seed, false); 339 } 340 createFile(FileSystem fs, Path fileName, boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, short replFactor, long seed, boolean flush)341 public static void createFile(FileSystem fs, Path fileName, 342 boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, 343 short replFactor, long seed, boolean flush) throws IOException { 344 createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize, 345 replFactor, seed, flush, null); 346 } 347 createFile(FileSystem fs, Path fileName, boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, short replFactor, long seed, boolean flush, InetSocketAddress[] favoredNodes)348 public static void createFile(FileSystem fs, Path fileName, 349 boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, 350 short replFactor, long seed, boolean flush, 351 InetSocketAddress[] favoredNodes) throws IOException { 352 assert bufferLen > 0; 353 if (!fs.mkdirs(fileName.getParent())) { 354 throw new IOException("Mkdirs failed to create " + 355 fileName.getParent().toString()); 356 } 357 FSDataOutputStream out = null; 358 EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); 359 createFlags.add(OVERWRITE); 360 if (isLazyPersist) { 361 createFlags.add(LAZY_PERSIST); 362 } 363 try { 364 if (favoredNodes == null) { 365 out = fs.create( 366 fileName, 367 FsPermission.getFileDefault(), 368 createFlags, 369 fs.getConf().getInt( 370 CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), 371 replFactor, blockSize, null); 372 } else { 373 out = ((DistributedFileSystem) fs).create(fileName, 374 FsPermission.getDefault(), true, bufferLen, replFactor, blockSize, 375 null, favoredNodes); 376 } 377 if (fileLen > 0) { 378 byte[] toWrite = new byte[bufferLen]; 379 Random rb = new Random(seed); 380 long bytesToWrite = fileLen; 381 while (bytesToWrite>0) { 382 rb.nextBytes(toWrite); 383 int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen 384 : (int) bytesToWrite; 385 386 out.write(toWrite, 0, bytesToWriteNext); 387 bytesToWrite -= bytesToWriteNext; 388 } 389 if (flush) { 390 out.hsync(); 391 } 392 } 393 } finally { 394 if (out != null) { 395 out.close(); 396 } 397 } 398 } 399 calculateFileContentsFromSeed(long seed, int length)400 public static byte[] calculateFileContentsFromSeed(long seed, int length) { 401 Random rb = new Random(seed); 402 byte val[] = new byte[length]; 403 rb.nextBytes(val); 404 return val; 405 } 406 407 /** check if the files have been copied correctly. */ checkFiles(FileSystem fs, String topdir)408 public boolean checkFiles(FileSystem fs, String topdir) throws IOException { 409 Path root = new Path(topdir); 410 411 for (int idx = 0; idx < nFiles; idx++) { 412 Path fPath = new Path(root, files[idx].getName()); 413 FSDataInputStream in = fs.open(fPath); 414 byte[] toRead = new byte[files[idx].getSize()]; 415 byte[] toCompare = new byte[files[idx].getSize()]; 416 Random rb = new Random(files[idx].getSeed()); 417 rb.nextBytes(toCompare); 418 in.readFully(0, toRead); 419 in.close(); 420 for (int i = 0; i < toRead.length; i++) { 421 if (toRead[i] != toCompare[i]) { 422 return false; 423 } 424 } 425 toRead = null; 426 toCompare = null; 427 } 428 429 return true; 430 } 431 setReplication(FileSystem fs, String topdir, short value)432 void setReplication(FileSystem fs, String topdir, short value) 433 throws IOException { 434 Path root = new Path(topdir); 435 for (int idx = 0; idx < nFiles; idx++) { 436 Path fPath = new Path(root, files[idx].getName()); 437 fs.setReplication(fPath, value); 438 } 439 } 440 441 /* 442 * Waits for the replication factor of all files to reach the 443 * specified target. 444 */ waitReplication(FileSystem fs, String topdir, short value)445 public void waitReplication(FileSystem fs, String topdir, short value) 446 throws IOException, InterruptedException, TimeoutException { 447 Path root = new Path(topdir); 448 449 /** wait for the replication factor to settle down */ 450 for (int idx = 0; idx < nFiles; idx++) { 451 waitReplication(fs, new Path(root, files[idx].getName()), value); 452 } 453 } 454 455 /* 456 * Check if the given block in the given file is corrupt. 457 */ allBlockReplicasCorrupt(MiniDFSCluster cluster, Path file, int blockNo)458 public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster, 459 Path file, int blockNo) throws IOException { 460 DFSClient client = new DFSClient(new InetSocketAddress("localhost", 461 cluster.getNameNodePort()), cluster.getConfiguration(0)); 462 LocatedBlocks blocks; 463 try { 464 blocks = client.getNamenode().getBlockLocations( 465 file.toString(), 0, Long.MAX_VALUE); 466 } finally { 467 client.close(); 468 } 469 return blocks.get(blockNo).isCorrupt(); 470 } 471 472 /* 473 * Wait up to 20s for the given block to be replicated across 474 * the requested number of racks, with the requested number of 475 * replicas, and the requested number of replicas still needed. 476 */ waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, int racks, int replicas, int neededReplicas)477 public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, 478 int racks, int replicas, int neededReplicas) 479 throws TimeoutException, InterruptedException { 480 int curRacks = 0; 481 int curReplicas = 0; 482 int curNeededReplicas = 0; 483 int count = 0; 484 final int ATTEMPTS = 20; 485 486 do { 487 Thread.sleep(1000); 488 int[] r = BlockManagerTestUtil.getReplicaInfo(cluster.getNamesystem(), 489 b.getLocalBlock()); 490 curRacks = r[0]; 491 curReplicas = r[1]; 492 curNeededReplicas = r[2]; 493 count++; 494 } while ((curRacks != racks || 495 curReplicas != replicas || 496 curNeededReplicas != neededReplicas) && count < ATTEMPTS); 497 498 if (count == ATTEMPTS) { 499 throw new TimeoutException("Timed out waiting for replication." 500 + " Needed replicas = "+neededReplicas 501 + " Cur needed replicas = "+curNeededReplicas 502 + " Replicas = "+replicas+" Cur replicas = "+curReplicas 503 + " Racks = "+racks+" Cur racks = "+curRacks); 504 } 505 } 506 507 /** 508 * Keep accessing the given file until the namenode reports that the 509 * given block in the file contains the given number of corrupt replicas. 510 */ waitCorruptReplicas(FileSystem fs, FSNamesystem ns, Path file, ExtendedBlock b, int corruptRepls)511 public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns, 512 Path file, ExtendedBlock b, int corruptRepls) 513 throws TimeoutException, InterruptedException { 514 int count = 0; 515 final int ATTEMPTS = 50; 516 int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock()); 517 while (repls != corruptRepls && count < ATTEMPTS) { 518 try { 519 IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), 520 512, true); 521 } catch (IOException e) { 522 // Swallow exceptions 523 } 524 System.out.println("Waiting for "+corruptRepls+" corrupt replicas"); 525 count++; 526 // check more often so corrupt block reports are not easily missed 527 for (int i = 0; i < 10; i++) { 528 repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock()); 529 Thread.sleep(100); 530 if (repls == corruptRepls) { 531 break; 532 } 533 } 534 } 535 if (count == ATTEMPTS) { 536 throw new TimeoutException("Timed out waiting for corrupt replicas." 537 + " Waiting for "+corruptRepls+", but only found "+repls); 538 } 539 } 540 541 /* 542 * Wait up to 20s for the given DN (IP:port) to be decommissioned 543 */ waitForDecommission(FileSystem fs, String name)544 public static void waitForDecommission(FileSystem fs, String name) 545 throws IOException, InterruptedException, TimeoutException { 546 DatanodeInfo dn = null; 547 int count = 0; 548 final int ATTEMPTS = 20; 549 550 do { 551 Thread.sleep(1000); 552 DistributedFileSystem dfs = (DistributedFileSystem)fs; 553 for (DatanodeInfo info : dfs.getDataNodeStats()) { 554 if (name.equals(info.getXferAddr())) { 555 dn = info; 556 } 557 } 558 count++; 559 } while ((dn == null || 560 dn.isDecommissionInProgress() || 561 !dn.isDecommissioned()) && count < ATTEMPTS); 562 563 if (count == ATTEMPTS) { 564 throw new TimeoutException("Timed out waiting for datanode " 565 + name + " to decommission."); 566 } 567 } 568 569 /* 570 * Returns the index of the first datanode which has a copy 571 * of the given block, or -1 if no such datanode exists. 572 */ firstDnWithBlock(MiniDFSCluster cluster, ExtendedBlock b)573 public static int firstDnWithBlock(MiniDFSCluster cluster, ExtendedBlock b) 574 throws IOException { 575 int numDatanodes = cluster.getDataNodes().size(); 576 for (int i = 0; i < numDatanodes; i++) { 577 String blockContent = cluster.readBlockOnDataNode(i, b); 578 if (blockContent != null) { 579 return i; 580 } 581 } 582 return -1; 583 } 584 585 /* 586 * Return the total capacity of all live DNs. 587 */ getLiveDatanodeCapacity(DatanodeManager dm)588 public static long getLiveDatanodeCapacity(DatanodeManager dm) { 589 final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); 590 dm.fetchDatanodes(live, null, false); 591 long capacity = 0; 592 for (final DatanodeDescriptor dn : live) { 593 capacity += dn.getCapacity(); 594 } 595 return capacity; 596 } 597 598 /* 599 * Return the capacity of the given live DN. 600 */ getDatanodeCapacity(DatanodeManager dm, int index)601 public static long getDatanodeCapacity(DatanodeManager dm, int index) { 602 final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); 603 dm.fetchDatanodes(live, null, false); 604 return live.get(index).getCapacity(); 605 } 606 607 /* 608 * Wait for the given # live/dead DNs, total capacity, and # vol failures. 609 */ waitForDatanodeStatus(DatanodeManager dm, int expectedLive, int expectedDead, long expectedVolFails, long expectedTotalCapacity, long timeout)610 public static void waitForDatanodeStatus(DatanodeManager dm, int expectedLive, 611 int expectedDead, long expectedVolFails, long expectedTotalCapacity, 612 long timeout) throws InterruptedException, TimeoutException { 613 final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); 614 final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); 615 final int ATTEMPTS = 10; 616 int count = 0; 617 long currTotalCapacity = 0; 618 int volFails = 0; 619 620 do { 621 Thread.sleep(timeout); 622 live.clear(); 623 dead.clear(); 624 dm.fetchDatanodes(live, dead, false); 625 currTotalCapacity = 0; 626 volFails = 0; 627 for (final DatanodeDescriptor dd : live) { 628 currTotalCapacity += dd.getCapacity(); 629 volFails += dd.getVolumeFailures(); 630 } 631 count++; 632 } while ((expectedLive != live.size() || 633 expectedDead != dead.size() || 634 expectedTotalCapacity != currTotalCapacity || 635 expectedVolFails != volFails) 636 && count < ATTEMPTS); 637 638 if (count == ATTEMPTS) { 639 throw new TimeoutException("Timed out waiting for capacity." 640 + " Live = "+live.size()+" Expected = "+expectedLive 641 + " Dead = "+dead.size()+" Expected = "+expectedDead 642 + " Total capacity = "+currTotalCapacity 643 + " Expected = "+expectedTotalCapacity 644 + " Vol Fails = "+volFails+" Expected = "+expectedVolFails); 645 } 646 } 647 648 /* 649 * Wait for the given DN to consider itself dead. 650 */ waitForDatanodeDeath(DataNode dn)651 public static void waitForDatanodeDeath(DataNode dn) 652 throws InterruptedException, TimeoutException { 653 final int ATTEMPTS = 10; 654 int count = 0; 655 do { 656 Thread.sleep(1000); 657 count++; 658 } while (dn.isDatanodeUp() && count < ATTEMPTS); 659 660 if (count == ATTEMPTS) { 661 throw new TimeoutException("Timed out waiting for DN to die"); 662 } 663 } 664 665 /** return list of filenames created as part of createFiles */ getFileNames(String topDir)666 public String[] getFileNames(String topDir) { 667 if (nFiles == 0) 668 return new String[]{}; 669 else { 670 String[] fileNames = new String[nFiles]; 671 for (int idx=0; idx < nFiles; idx++) { 672 fileNames[idx] = topDir + "/" + files[idx].getName(); 673 } 674 return fileNames; 675 } 676 } 677 678 /** 679 * Wait for the given file to reach the given replication factor. 680 * @throws TimeoutException if we fail to sufficiently replicate the file 681 */ waitReplication(FileSystem fs, Path fileName, short replFactor)682 public static void waitReplication(FileSystem fs, Path fileName, short replFactor) 683 throws IOException, InterruptedException, TimeoutException { 684 boolean correctReplFactor; 685 final int ATTEMPTS = 40; 686 int count = 0; 687 688 do { 689 correctReplFactor = true; 690 BlockLocation locs[] = fs.getFileBlockLocations( 691 fs.getFileStatus(fileName), 0, Long.MAX_VALUE); 692 count++; 693 for (int j = 0; j < locs.length; j++) { 694 String[] hostnames = locs[j].getNames(); 695 if (hostnames.length != replFactor) { 696 correctReplFactor = false; 697 System.out.println("Block " + j + " of file " + fileName 698 + " has replication factor " + hostnames.length 699 + " (desired " + replFactor + "); locations " 700 + Joiner.on(' ').join(hostnames)); 701 Thread.sleep(1000); 702 break; 703 } 704 } 705 if (correctReplFactor) { 706 System.out.println("All blocks of file " + fileName 707 + " verified to have replication factor " + replFactor); 708 } 709 } while (!correctReplFactor && count < ATTEMPTS); 710 711 if (count == ATTEMPTS) { 712 throw new TimeoutException("Timed out waiting for " + fileName + 713 " to reach " + replFactor + " replicas"); 714 } 715 } 716 717 /** delete directory and everything underneath it.*/ cleanup(FileSystem fs, String topdir)718 public void cleanup(FileSystem fs, String topdir) throws IOException { 719 Path root = new Path(topdir); 720 fs.delete(root, true); 721 files = null; 722 } 723 getFirstBlock(FileSystem fs, Path path)724 public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException { 725 HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path); 726 try { 727 in.readByte(); 728 return in.getCurrentBlock(); 729 } finally { 730 in.close(); 731 } 732 } 733 getAllBlocks(FSDataInputStream in)734 public static List<LocatedBlock> getAllBlocks(FSDataInputStream in) 735 throws IOException { 736 return ((HdfsDataInputStream) in).getAllBlocks(); 737 } 738 getAllBlocks(FileSystem fs, Path path)739 public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path) 740 throws IOException { 741 HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path); 742 return in.getAllBlocks(); 743 } 744 getBlockToken( FSDataOutputStream out)745 public static Token<BlockTokenIdentifier> getBlockToken( 746 FSDataOutputStream out) { 747 return ((DFSOutputStream) out.getWrappedStream()).getBlockToken(); 748 } 749 readFile(File f)750 public static String readFile(File f) throws IOException { 751 StringBuilder b = new StringBuilder(); 752 BufferedReader in = new BufferedReader(new FileReader(f)); 753 for(int c; (c = in.read()) != -1; b.append((char)c)); 754 in.close(); 755 return b.toString(); 756 } 757 758 /* Write the given string to the given file */ writeFile(FileSystem fs, Path p, String s)759 public static void writeFile(FileSystem fs, Path p, String s) 760 throws IOException { 761 if (fs.exists(p)) { 762 fs.delete(p, true); 763 } 764 InputStream is = new ByteArrayInputStream(s.getBytes()); 765 FSDataOutputStream os = fs.create(p); 766 IOUtils.copyBytes(is, os, s.length(), true); 767 } 768 769 /* Append the given string to the given file */ appendFile(FileSystem fs, Path p, String s)770 public static void appendFile(FileSystem fs, Path p, String s) 771 throws IOException { 772 assert fs.exists(p); 773 InputStream is = new ByteArrayInputStream(s.getBytes()); 774 FSDataOutputStream os = fs.append(p); 775 IOUtils.copyBytes(is, os, s.length(), true); 776 } 777 778 /** 779 * Append specified length of bytes to a given file 780 * @param fs The file system 781 * @param p Path of the file to append 782 * @param length Length of bytes to append to the file 783 * @throws IOException 784 */ appendFile(FileSystem fs, Path p, int length)785 public static void appendFile(FileSystem fs, Path p, int length) 786 throws IOException { 787 assert fs.exists(p); 788 assert length >= 0; 789 byte[] toAppend = new byte[length]; 790 Random random = new Random(); 791 random.nextBytes(toAppend); 792 FSDataOutputStream out = fs.append(p); 793 out.write(toAppend); 794 out.close(); 795 } 796 797 /** 798 * @return url content as string (UTF-8 encoding assumed) 799 */ urlGet(URL url)800 public static String urlGet(URL url) throws IOException { 801 return new String(urlGetBytes(url), Charsets.UTF_8); 802 } 803 804 /** 805 * @return URL contents as a byte array 806 */ urlGetBytes(URL url)807 public static byte[] urlGetBytes(URL url) throws IOException { 808 URLConnection conn = url.openConnection(); 809 HttpURLConnection hc = (HttpURLConnection)conn; 810 811 assertEquals(HttpURLConnection.HTTP_OK, hc.getResponseCode()); 812 ByteArrayOutputStream out = new ByteArrayOutputStream(); 813 IOUtils.copyBytes(conn.getInputStream(), out, 4096, true); 814 return out.toByteArray(); 815 } 816 817 /** 818 * mock class to get group mapping for fake users 819 * 820 */ 821 static class MockUnixGroupsMapping extends ShellBasedUnixGroupsMapping { 822 static Map<String, String []> fakeUser2GroupsMap; 823 private static final List<String> defaultGroups; 824 static { 825 defaultGroups = new ArrayList<String>(1); 826 defaultGroups.add("supergroup"); 827 fakeUser2GroupsMap = new HashMap<String, String[]>(); 828 } 829 830 @Override getGroups(String user)831 public List<String> getGroups(String user) throws IOException { 832 boolean found = false; 833 834 // check to see if this is one of fake users 835 List<String> l = new ArrayList<String>(); 836 for(String u : fakeUser2GroupsMap.keySet()) { 837 if(user.equals(u)) { 838 found = true; 839 for(String gr : fakeUser2GroupsMap.get(u)) { 840 l.add(gr); 841 } 842 } 843 } 844 845 // default 846 if(!found) { 847 l = super.getGroups(user); 848 if(l.size() == 0) { 849 System.out.println("failed to get real group for " + user + 850 "; using default"); 851 return defaultGroups; 852 } 853 } 854 return l; 855 } 856 } 857 858 /** 859 * update the configuration with fake class for mapping user to groups 860 * @param conf 861 * @param map - user to groups mapping 862 */ updateConfWithFakeGroupMapping(Configuration conf, Map<String, String []> map)863 static public void updateConfWithFakeGroupMapping 864 (Configuration conf, Map<String, String []> map) { 865 if(map!=null) { 866 MockUnixGroupsMapping.fakeUser2GroupsMap = map; 867 } 868 869 // fake mapping user to groups 870 conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, 871 DFSTestUtil.MockUnixGroupsMapping.class, 872 ShellBasedUnixGroupsMapping.class); 873 874 } 875 876 /** 877 * Get a FileSystem instance as specified user in a doAs block. 878 */ getFileSystemAs(UserGroupInformation ugi, final Configuration conf)879 static public FileSystem getFileSystemAs(UserGroupInformation ugi, 880 final Configuration conf) throws IOException { 881 try { 882 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 883 @Override 884 public FileSystem run() throws Exception { 885 return FileSystem.get(conf); 886 } 887 }); 888 } catch (InterruptedException e) { 889 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 890 } 891 } 892 893 public static byte[] generateSequentialBytes(int start, int length) { 894 byte[] result = new byte[length]; 895 896 for (int i = 0; i < length; i++) { 897 result[i] = (byte) ((start + i) % 127); 898 } 899 900 return result; 901 } 902 903 public static Statistics getStatistics(FileSystem fs) { 904 return FileSystem.getStatistics(fs.getUri().getScheme(), fs.getClass()); 905 } 906 907 /** 908 * Load file into byte[] 909 */ 910 public static byte[] loadFile(String filename) throws IOException { 911 File file = new File(filename); 912 DataInputStream in = new DataInputStream(new FileInputStream(file)); 913 byte[] content = new byte[(int)file.length()]; 914 try { 915 in.readFully(content); 916 } finally { 917 IOUtils.cleanup(LOG, in); 918 } 919 return content; 920 } 921 922 /** For {@link TestTransferRbw} */ 923 public static BlockOpResponseProto transferRbw(final ExtendedBlock b, 924 final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { 925 assertEquals(2, datanodes.length); 926 final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0], 927 datanodes.length, dfsClient); 928 final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); 929 final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( 930 NetUtils.getOutputStream(s, writeTimeout), 931 HdfsConstants.SMALL_BUFFER_SIZE)); 932 final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); 933 934 // send the request 935 new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), 936 dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, 937 new StorageType[]{StorageType.DEFAULT}); 938 out.flush(); 939 940 return BlockOpResponseProto.parseDelimitedFrom(in); 941 } 942 943 public static void setFederatedConfiguration(MiniDFSCluster cluster, 944 Configuration conf) { 945 Set<String> nameservices = new HashSet<String>(); 946 for (NameNodeInfo info : cluster.getNameNodeInfos()) { 947 assert info.nameserviceId != null; 948 nameservices.add(info.nameserviceId); 949 conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, 950 info.nameserviceId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, 951 info.nameNode.getNameNodeAddress()).toString()); 952 conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, 953 info.nameserviceId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, 954 info.nameNode.getNameNodeAddress()).toString()); 955 } 956 conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",") 957 .join(nameservices)); 958 } 959 960 public static void setFederatedHAConfiguration(MiniDFSCluster cluster, 961 Configuration conf) { 962 Map<String, List<String>> nameservices = Maps.newHashMap(); 963 for (NameNodeInfo info : cluster.getNameNodeInfos()) { 964 Preconditions.checkState(info.nameserviceId != null); 965 List<String> nns = nameservices.get(info.nameserviceId); 966 if (nns == null) { 967 nns = Lists.newArrayList(); 968 nameservices.put(info.nameserviceId, nns); 969 } 970 nns.add(info.nnId); 971 972 conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, 973 info.nameserviceId, info.nnId), 974 DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, 975 info.nameNode.getNameNodeAddress()).toString()); 976 conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, 977 info.nameserviceId, info.nnId), 978 DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, 979 info.nameNode.getNameNodeAddress()).toString()); 980 } 981 for (Map.Entry<String, List<String>> entry : nameservices.entrySet()) { 982 conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, 983 entry.getKey()), Joiner.on(",").join(entry.getValue())); 984 conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + entry 985 .getKey(), ConfiguredFailoverProxyProvider.class.getName()); 986 } 987 conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",") 988 .join(nameservices.keySet())); 989 } 990 991 private static DatanodeID getDatanodeID(String ipAddr) { 992 return new DatanodeID(ipAddr, "localhost", 993 UUID.randomUUID().toString(), 994 DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, 995 DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, 996 DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, 997 DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT); 998 } 999 1000 public static DatanodeID getLocalDatanodeID() { 1001 return getDatanodeID("127.0.0.1"); 1002 } 1003 1004 public static DatanodeID getLocalDatanodeID(int port) { 1005 return new DatanodeID("127.0.0.1", "localhost", 1006 UUID.randomUUID().toString(), 1007 port, port, port, port); 1008 } 1009 1010 public static DatanodeDescriptor getLocalDatanodeDescriptor() { 1011 return new DatanodeDescriptor(getLocalDatanodeID()); 1012 } 1013 1014 public static DatanodeInfo getLocalDatanodeInfo() { 1015 return new DatanodeInfo(getLocalDatanodeID()); 1016 } 1017 1018 public static DatanodeInfo getDatanodeInfo(String ipAddr) { 1019 return new DatanodeInfo(getDatanodeID(ipAddr)); 1020 } 1021 1022 public static DatanodeInfo getLocalDatanodeInfo(int port) { 1023 return new DatanodeInfo(getLocalDatanodeID(port)); 1024 } 1025 1026 public static DatanodeInfo getDatanodeInfo(String ipAddr, 1027 String host, int port) { 1028 return new DatanodeInfo(new DatanodeID(ipAddr, host, 1029 UUID.randomUUID().toString(), port, 1030 DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, 1031 DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, 1032 DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT)); 1033 } 1034 1035 public static DatanodeInfo getLocalDatanodeInfo(String ipAddr, 1036 String hostname, AdminStates adminState) { 1037 return new DatanodeInfo(ipAddr, hostname, "", 1038 DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, 1039 DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, 1040 DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, 1041 DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT, 1042 1l, 2l, 3l, 4l, 0l, 0l, 0l, 5, 6, "local", adminState); 1043 } 1044 1045 public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, 1046 String rackLocation) { 1047 return getDatanodeDescriptor(ipAddr, DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, 1048 rackLocation); 1049 } 1050 1051 public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, 1052 String rackLocation, String hostname) { 1053 return getDatanodeDescriptor(ipAddr, 1054 DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname); 1055 } 1056 1057 public static DatanodeStorageInfo createDatanodeStorageInfo( 1058 String storageID, String ip) { 1059 return createDatanodeStorageInfo(storageID, ip, "defaultRack", "host"); 1060 } 1061 1062 public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) { 1063 return createDatanodeStorageInfos(racks, null); 1064 } 1065 1066 public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks, String[] hostnames) { 1067 return createDatanodeStorageInfos(racks.length, racks, hostnames); 1068 } 1069 1070 public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) { 1071 return createDatanodeStorageInfos(n, null, null); 1072 } 1073 1074 public static DatanodeStorageInfo[] createDatanodeStorageInfos( 1075 int n, String[] racks, String[] hostnames) { 1076 return createDatanodeStorageInfos(n, racks, hostnames, null); 1077 } 1078 1079 public static DatanodeStorageInfo[] createDatanodeStorageInfos( 1080 int n, String[] racks, String[] hostnames, StorageType[] types) { 1081 DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n]; 1082 for(int i = storages.length; i > 0; ) { 1083 final String storageID = "s" + i; 1084 final String ip = i + "." + i + "." + i + "." + i; 1085 i--; 1086 final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack"; 1087 final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host"; 1088 final StorageType type = (types != null && i < types.length) ? types[i] 1089 : StorageType.DEFAULT; 1090 storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname, 1091 type); 1092 } 1093 return storages; 1094 } 1095 1096 public static DatanodeStorageInfo createDatanodeStorageInfo( 1097 String storageID, String ip, String rack, String hostname) { 1098 return createDatanodeStorageInfo(storageID, ip, rack, hostname, 1099 StorageType.DEFAULT); 1100 } 1101 1102 public static DatanodeStorageInfo createDatanodeStorageInfo( 1103 String storageID, String ip, String rack, String hostname, 1104 StorageType type) { 1105 final DatanodeStorage storage = new DatanodeStorage(storageID, 1106 DatanodeStorage.State.NORMAL, type); 1107 final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor( 1108 ip, rack, storage, hostname); 1109 return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); 1110 } 1111 1112 public static DatanodeDescriptor[] toDatanodeDescriptor( 1113 DatanodeStorageInfo[] storages) { 1114 DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length]; 1115 for(int i = 0; i < datanodes.length; i++) { 1116 datanodes[i] = storages[i].getDatanodeDescriptor(); 1117 } 1118 return datanodes; 1119 } 1120 1121 public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, 1122 int port, String rackLocation, String hostname) { 1123 DatanodeID dnId = new DatanodeID(ipAddr, hostname, 1124 UUID.randomUUID().toString(), port, 1125 DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, 1126 DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, 1127 DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT); 1128 return new DatanodeDescriptor(dnId, rackLocation); 1129 } 1130 1131 public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, 1132 int port, String rackLocation) { 1133 return getDatanodeDescriptor(ipAddr, port, rackLocation, "host"); 1134 } 1135 1136 public static DatanodeRegistration getLocalDatanodeRegistration() { 1137 return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo( 1138 NodeType.DATA_NODE), new ExportedBlockKeys(), VersionInfo.getVersion()); 1139 } 1140 1141 /** Copy one file's contents into the other **/ 1142 public static void copyFile(File src, File dest) throws IOException { 1143 FileUtils.copyFile(src, dest); 1144 } 1145 1146 public static class Builder { 1147 private int maxLevels = 3; 1148 private int maxSize = 8*1024; 1149 private int minSize = 1; 1150 private int nFiles = 1; 1151 1152 public Builder() { 1153 } 1154 1155 public Builder setName(String string) { 1156 return this; 1157 } 1158 1159 public Builder setNumFiles(int nFiles) { 1160 this.nFiles = nFiles; 1161 return this; 1162 } 1163 1164 public Builder setMaxLevels(int maxLevels) { 1165 this.maxLevels = maxLevels; 1166 return this; 1167 } 1168 1169 public Builder setMaxSize(int maxSize) { 1170 this.maxSize = maxSize; 1171 return this; 1172 } 1173 1174 public Builder setMinSize(int minSize) { 1175 this.minSize = minSize; 1176 return this; 1177 } 1178 1179 public DFSTestUtil build() { 1180 return new DFSTestUtil(nFiles, maxLevels, maxSize, minSize); 1181 } 1182 } 1183 1184 /** 1185 * Run a set of operations and generate all edit logs 1186 */ 1187 public static void runOperations(MiniDFSCluster cluster, 1188 DistributedFileSystem filesystem, Configuration conf, long blockSize, 1189 int nnIndex) throws IOException { 1190 // create FileContext for rename2 1191 FileContext fc = FileContext.getFileContext(cluster.getURI(0), conf); 1192 1193 // OP_ADD 0 1194 final Path pathFileCreate = new Path("/file_create"); 1195 FSDataOutputStream s = filesystem.create(pathFileCreate); 1196 // OP_CLOSE 9 1197 s.close(); 1198 // OP_APPEND 47 1199 FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null); 1200 s2.close(); 1201 // OP_SET_STORAGE_POLICY 45 1202 filesystem.setStoragePolicy(pathFileCreate, 1203 HdfsConstants.HOT_STORAGE_POLICY_NAME); 1204 // OP_RENAME_OLD 1 1205 final Path pathFileMoved = new Path("/file_moved"); 1206 filesystem.rename(pathFileCreate, pathFileMoved); 1207 // OP_DELETE 2 1208 filesystem.delete(pathFileMoved, false); 1209 // OP_MKDIR 3 1210 Path pathDirectoryMkdir = new Path("/directory_mkdir"); 1211 filesystem.mkdirs(pathDirectoryMkdir); 1212 // OP_ALLOW_SNAPSHOT 29 1213 filesystem.allowSnapshot(pathDirectoryMkdir); 1214 // OP_DISALLOW_SNAPSHOT 30 1215 filesystem.disallowSnapshot(pathDirectoryMkdir); 1216 // OP_CREATE_SNAPSHOT 26 1217 String ssName = "snapshot1"; 1218 filesystem.allowSnapshot(pathDirectoryMkdir); 1219 filesystem.createSnapshot(pathDirectoryMkdir, ssName); 1220 // OP_RENAME_SNAPSHOT 28 1221 String ssNewName = "snapshot2"; 1222 filesystem.renameSnapshot(pathDirectoryMkdir, ssName, ssNewName); 1223 // OP_DELETE_SNAPSHOT 27 1224 filesystem.deleteSnapshot(pathDirectoryMkdir, ssNewName); 1225 // OP_SET_REPLICATION 4 1226 s = filesystem.create(pathFileCreate); 1227 s.close(); 1228 filesystem.setReplication(pathFileCreate, (short)1); 1229 // OP_SET_PERMISSIONS 7 1230 Short permission = 0777; 1231 filesystem.setPermission(pathFileCreate, new FsPermission(permission)); 1232 // OP_SET_OWNER 8 1233 filesystem.setOwner(pathFileCreate, new String("newOwner"), null); 1234 // OP_CLOSE 9 see above 1235 // OP_SET_GENSTAMP 10 see above 1236 // OP_SET_NS_QUOTA 11 obsolete 1237 // OP_CLEAR_NS_QUOTA 12 obsolete 1238 // OP_TIMES 13 1239 long mtime = 1285195527000L; // Wed, 22 Sep 2010 22:45:27 GMT 1240 long atime = mtime; 1241 filesystem.setTimes(pathFileCreate, mtime, atime); 1242 // OP_SET_QUOTA 14 1243 filesystem.setQuota(pathDirectoryMkdir, 1000L, 1244 HdfsConstants.QUOTA_DONT_SET); 1245 // OP_SET_QUOTA_BY_STORAGETYPE 1246 filesystem.setQuotaByStorageType(pathDirectoryMkdir, StorageType.SSD, 888L); 1247 // OP_RENAME 15 1248 fc.rename(pathFileCreate, pathFileMoved, Rename.NONE); 1249 // OP_CONCAT_DELETE 16 1250 Path pathConcatTarget = new Path("/file_concat_target"); 1251 Path[] pathConcatFiles = new Path[2]; 1252 pathConcatFiles[0] = new Path("/file_concat_0"); 1253 pathConcatFiles[1] = new Path("/file_concat_1"); 1254 1255 long length = blockSize * 3; // multiple of blocksize for concat 1256 short replication = 1; 1257 long seed = 1; 1258 DFSTestUtil.createFile(filesystem, pathConcatTarget, length, replication, 1259 seed); 1260 DFSTestUtil.createFile(filesystem, pathConcatFiles[0], length, replication, 1261 seed); 1262 DFSTestUtil.createFile(filesystem, pathConcatFiles[1], length, replication, 1263 seed); 1264 filesystem.concat(pathConcatTarget, pathConcatFiles); 1265 1266 // OP_TRUNCATE 46 1267 length = blockSize * 2; 1268 DFSTestUtil.createFile(filesystem, pathFileCreate, length, replication, 1269 seed); 1270 filesystem.truncate(pathFileCreate, blockSize); 1271 1272 // OP_SYMLINK 17 1273 Path pathSymlink = new Path("/file_symlink"); 1274 fc.createSymlink(pathConcatTarget, pathSymlink, false); 1275 1276 // OP_REASSIGN_LEASE 22 1277 String filePath = "/hard-lease-recovery-test"; 1278 byte[] bytes = "foo-bar-baz".getBytes(); 1279 DFSClientAdapter.stopLeaseRenewer(filesystem); 1280 FSDataOutputStream leaseRecoveryPath = filesystem.create(new Path(filePath)); 1281 leaseRecoveryPath.write(bytes); 1282 leaseRecoveryPath.hflush(); 1283 // Set the hard lease timeout to 1 second. 1284 cluster.setLeasePeriod(60 * 1000, 1000, nnIndex); 1285 // wait for lease recovery to complete 1286 LocatedBlocks locatedBlocks; 1287 do { 1288 try { 1289 Thread.sleep(1000); 1290 } catch (InterruptedException e) {} 1291 locatedBlocks = DFSClientAdapter.callGetBlockLocations( 1292 cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length); 1293 } while (locatedBlocks.isUnderConstruction()); 1294 // OP_ADD_CACHE_POOL 1295 filesystem.addCachePool(new CachePoolInfo("pool1")); 1296 // OP_MODIFY_CACHE_POOL 1297 filesystem.modifyCachePool(new CachePoolInfo("pool1").setLimit(99l)); 1298 // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 1299 long id = filesystem.addCacheDirective( 1300 new CacheDirectiveInfo.Builder(). 1301 setPath(new Path("/path")). 1302 setReplication((short)1). 1303 setPool("pool1"). 1304 build(), EnumSet.of(CacheFlag.FORCE)); 1305 // OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE 1306 filesystem.modifyCacheDirective( 1307 new CacheDirectiveInfo.Builder(). 1308 setId(id). 1309 setReplication((short)2). 1310 build(), EnumSet.of(CacheFlag.FORCE)); 1311 // OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE 1312 filesystem.removeCacheDirective(id); 1313 // OP_REMOVE_CACHE_POOL 1314 filesystem.removeCachePool("pool1"); 1315 // OP_SET_ACL 1316 List<AclEntry> aclEntryList = Lists.newArrayList(); 1317 aclEntryList.add( 1318 new AclEntry.Builder() 1319 .setPermission(FsAction.READ_WRITE) 1320 .setScope(AclEntryScope.ACCESS) 1321 .setType(AclEntryType.USER) 1322 .build()); 1323 aclEntryList.add( 1324 new AclEntry.Builder() 1325 .setName("user") 1326 .setPermission(FsAction.READ_WRITE) 1327 .setScope(AclEntryScope.ACCESS) 1328 .setType(AclEntryType.USER) 1329 .build()); 1330 aclEntryList.add( 1331 new AclEntry.Builder() 1332 .setPermission(FsAction.WRITE) 1333 .setScope(AclEntryScope.ACCESS) 1334 .setType(AclEntryType.GROUP) 1335 .build()); 1336 aclEntryList.add( 1337 new AclEntry.Builder() 1338 .setPermission(FsAction.NONE) 1339 .setScope(AclEntryScope.ACCESS) 1340 .setType(AclEntryType.OTHER) 1341 .build()); 1342 filesystem.setAcl(pathConcatTarget, aclEntryList); 1343 // OP_SET_XATTR 1344 filesystem.setXAttr(pathConcatTarget, "user.a1", 1345 new byte[]{0x31, 0x32, 0x33}); 1346 filesystem.setXAttr(pathConcatTarget, "user.a2", 1347 new byte[]{0x37, 0x38, 0x39}); 1348 // OP_REMOVE_XATTR 1349 filesystem.removeXAttr(pathConcatTarget, "user.a2"); 1350 } 1351 1352 public static void abortStream(DFSOutputStream out) throws IOException { 1353 out.abort(); 1354 } 1355 1356 public static byte[] asArray(ByteBuffer buf) { 1357 byte arr[] = new byte[buf.remaining()]; 1358 buf.duplicate().get(arr); 1359 return arr; 1360 } 1361 1362 /** 1363 * Blocks until cache usage hits the expected new value. 1364 */ 1365 public static long verifyExpectedCacheUsage(final long expectedCacheUsed, 1366 final long expectedBlocks, final FsDatasetSpi<?> fsd) throws Exception { 1367 GenericTestUtils.waitFor(new Supplier<Boolean>() { 1368 private int tries = 0; 1369 1370 @Override 1371 public Boolean get() { 1372 long curCacheUsed = fsd.getCacheUsed(); 1373 long curBlocks = fsd.getNumBlocksCached(); 1374 if ((curCacheUsed != expectedCacheUsed) || 1375 (curBlocks != expectedBlocks)) { 1376 if (tries++ > 10) { 1377 LOG.info("verifyExpectedCacheUsage: have " + 1378 curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " + 1379 curBlocks + "/" + expectedBlocks + " blocks cached. " + 1380 "memlock limit = " + 1381 NativeIO.POSIX.getCacheManipulator().getMemlockLimit() + 1382 ". Waiting..."); 1383 } 1384 return false; 1385 } 1386 LOG.info("verifyExpectedCacheUsage: got " + 1387 curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " + 1388 curBlocks + "/" + expectedBlocks + " blocks cached. " + 1389 "memlock limit = " + 1390 NativeIO.POSIX.getCacheManipulator().getMemlockLimit()); 1391 return true; 1392 } 1393 }, 100, 60000); 1394 return expectedCacheUsed; 1395 } 1396 1397 /** 1398 * Round a long value up to a multiple of a factor. 1399 * 1400 * @param val The value. 1401 * @param factor The factor to round up to. Must be > 1. 1402 * @return The rounded value. 1403 */ 1404 public static long roundUpToMultiple(long val, int factor) { 1405 assert (factor > 1); 1406 long c = (val + factor - 1) / factor; 1407 return c * factor; 1408 } 1409 1410 public static void checkComponentsEquals(byte[][] expected, byte[][] actual) { 1411 assertEquals("expected: " + DFSUtil.byteArray2PathString(expected) 1412 + ", actual: " + DFSUtil.byteArray2PathString(actual), expected.length, 1413 actual.length); 1414 int i = 0; 1415 for (byte[] e : expected) { 1416 byte[] actualComponent = actual[i++]; 1417 assertTrue("expected: " + DFSUtil.bytes2String(e) + ", actual: " 1418 + DFSUtil.bytes2String(actualComponent), 1419 Arrays.equals(e, actualComponent)); 1420 } 1421 } 1422 1423 /** 1424 * A short-circuit test context which makes it easier to get a short-circuit 1425 * configuration and set everything up. 1426 */ 1427 public static class ShortCircuitTestContext implements Closeable { 1428 private final String testName; 1429 private final TemporarySocketDirectory sockDir; 1430 private boolean closed = false; 1431 private final boolean formerTcpReadsDisabled; 1432 1433 public ShortCircuitTestContext(String testName) { 1434 this.testName = testName; 1435 this.sockDir = new TemporarySocketDirectory(); 1436 DomainSocket.disableBindPathValidation(); 1437 formerTcpReadsDisabled = DFSInputStream.tcpReadsDisabledForTesting; 1438 Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null); 1439 } 1440 1441 public Configuration newConfiguration() { 1442 Configuration conf = new Configuration(); 1443 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); 1444 conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, 1445 new File(sockDir.getDir(), 1446 testName + "._PORT.sock").getAbsolutePath()); 1447 return conf; 1448 } 1449 1450 public String getTestName() { 1451 return testName; 1452 } 1453 1454 public void close() throws IOException { 1455 if (closed) return; 1456 closed = true; 1457 DFSInputStream.tcpReadsDisabledForTesting = formerTcpReadsDisabled; 1458 sockDir.close(); 1459 } 1460 } 1461 1462 /** 1463 * Verify that two files have the same contents. 1464 * 1465 * @param fs The file system containing the two files. 1466 * @param p1 The path of the first file. 1467 * @param p2 The path of the second file. 1468 * @param len The length of the two files. 1469 * @throws IOException 1470 */ 1471 public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len) 1472 throws IOException { 1473 final FSDataInputStream in1 = fs.open(p1); 1474 final FSDataInputStream in2 = fs.open(p2); 1475 for (int i = 0; i < len; i++) { 1476 assertEquals("Mismatch at byte " + i, in1.read(), in2.read()); 1477 } 1478 in1.close(); 1479 in2.close(); 1480 } 1481 1482 /** 1483 * Verify that two files have different contents. 1484 * 1485 * @param fs The file system containing the two files. 1486 * @param p1 The path of the first file. 1487 * @param p2 The path of the second file. 1488 * @param len The length of the two files. 1489 * @throws IOException 1490 */ 1491 public static void verifyFilesNotEqual(FileSystem fs, Path p1, Path p2, 1492 int len) 1493 throws IOException { 1494 final FSDataInputStream in1 = fs.open(p1); 1495 final FSDataInputStream in2 = fs.open(p2); 1496 try { 1497 for (int i = 0; i < len; i++) { 1498 if (in1.read() != in2.read()) { 1499 return; 1500 } 1501 } 1502 fail("files are equal, but should not be"); 1503 } finally { 1504 in1.close(); 1505 in2.close(); 1506 } 1507 } 1508 1509 /** 1510 * Helper function that verified blocks of a file are placed on the 1511 * expected storage type. 1512 * 1513 * @param fs The file system containing the the file. 1514 * @param client The DFS client used to access the file 1515 * @param path name to the file to verify 1516 * @param storageType expected storage type 1517 * @returns true if file exists and its blocks are located on the expected 1518 * storage type. 1519 * false otherwise. 1520 */ 1521 public static boolean verifyFileReplicasOnStorageType(FileSystem fs, 1522 DFSClient client, Path path, StorageType storageType) throws IOException { 1523 if (!fs.exists(path)) { 1524 LOG.info("verifyFileReplicasOnStorageType: file " + path + "does not exist"); 1525 return false; 1526 } 1527 long fileLength = client.getFileInfo(path.toString()).getLen(); 1528 LocatedBlocks locatedBlocks = 1529 client.getLocatedBlocks(path.toString(), 0, fileLength); 1530 for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { 1531 if (locatedBlock.getStorageTypes()[0] != storageType) { 1532 LOG.info("verifyFileReplicasOnStorageType: for file " + path + 1533 ". Expect blk" + locatedBlock + 1534 " on Type: " + storageType + ". Actual Type: " + 1535 locatedBlock.getStorageTypes()[0]); 1536 return false; 1537 } 1538 } 1539 return true; 1540 } 1541 1542 /** 1543 * Helper function to create a key in the Key Provider. Defaults 1544 * to the first indexed NameNode's Key Provider. 1545 * 1546 * @param keyName The name of the key to create 1547 * @param cluster The cluster to create it in 1548 * @param conf Configuration to use 1549 */ 1550 public static void createKey(String keyName, MiniDFSCluster cluster, 1551 Configuration conf) 1552 throws NoSuchAlgorithmException, IOException { 1553 createKey(keyName, cluster, 0, conf); 1554 } 1555 1556 /** 1557 * Helper function to create a key in the Key Provider. 1558 * 1559 * @param keyName The name of the key to create 1560 * @param cluster The cluster to create it in 1561 * @param idx The NameNode index 1562 * @param conf Configuration to use 1563 */ 1564 public static void createKey(String keyName, MiniDFSCluster cluster, 1565 int idx, Configuration conf) 1566 throws NoSuchAlgorithmException, IOException { 1567 NameNode nn = cluster.getNameNode(idx); 1568 KeyProvider provider = nn.getNamesystem().getProvider(); 1569 final KeyProvider.Options options = KeyProvider.options(conf); 1570 options.setDescription(keyName); 1571 options.setBitLength(128); 1572 provider.createKey(keyName, options); 1573 provider.flush(); 1574 } 1575 1576 /** 1577 * @return the node which is expected to run the recovery of the 1578 * given block, which is known to be under construction inside the 1579 * given NameNOde. 1580 */ 1581 public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn, 1582 ExtendedBlock blk) { 1583 BlockManager bm0 = nn.getNamesystem().getBlockManager(); 1584 BlockInfoContiguous storedBlock = bm0.getStoredBlock(blk.getLocalBlock()); 1585 assertTrue("Block " + blk + " should be under construction, " + 1586 "got: " + storedBlock, 1587 storedBlock instanceof BlockInfoContiguousUnderConstruction); 1588 BlockInfoContiguousUnderConstruction ucBlock = 1589 (BlockInfoContiguousUnderConstruction)storedBlock; 1590 // We expect that the replica with the most recent heart beat will be 1591 // the one to be in charge of the synchronization / recovery protocol. 1592 final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations(); 1593 DatanodeStorageInfo expectedPrimary = storages[0]; 1594 long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor() 1595 .getLastUpdateMonotonic(); 1596 for (int i = 1; i < storages.length; i++) { 1597 final long lastUpdate = storages[i].getDatanodeDescriptor() 1598 .getLastUpdateMonotonic(); 1599 if (lastUpdate > mostRecentLastUpdate) { 1600 expectedPrimary = storages[i]; 1601 mostRecentLastUpdate = lastUpdate; 1602 } 1603 } 1604 return expectedPrimary.getDatanodeDescriptor(); 1605 } 1606 1607 public static void toolRun(Tool tool, String cmd, int retcode, String contain) 1608 throws Exception { 1609 String [] cmds = StringUtils.split(cmd, ' '); 1610 System.out.flush(); 1611 System.err.flush(); 1612 PrintStream origOut = System.out; 1613 PrintStream origErr = System.err; 1614 String output = null; 1615 int ret = 0; 1616 try { 1617 ByteArrayOutputStream bs = new ByteArrayOutputStream(1024); 1618 PrintStream out = new PrintStream(bs); 1619 System.setOut(out); 1620 System.setErr(out); 1621 ret = tool.run(cmds); 1622 System.out.flush(); 1623 System.err.flush(); 1624 out.close(); 1625 output = bs.toString(); 1626 } finally { 1627 System.setOut(origOut); 1628 System.setErr(origErr); 1629 } 1630 System.out.println("Output for command: " + cmd + " retcode: " + ret); 1631 if (output != null) { 1632 System.out.println(output); 1633 } 1634 assertEquals(retcode, ret); 1635 if (contain != null) { 1636 assertTrue("The real output is: " + output + ".\n It should contain: " 1637 + contain, output.contains(contain)); 1638 } 1639 } 1640 1641 public static void FsShellRun(String cmd, int retcode, String contain, 1642 Configuration conf) throws Exception { 1643 FsShell shell = new FsShell(new Configuration(conf)); 1644 toolRun(shell, cmd, retcode, contain); 1645 } 1646 1647 public static void DFSAdminRun(String cmd, int retcode, String contain, 1648 Configuration conf) throws Exception { 1649 DFSAdmin admin = new DFSAdmin(new Configuration(conf)); 1650 toolRun(admin, cmd, retcode, contain); 1651 } 1652 1653 public static void FsShellRun(String cmd, Configuration conf) 1654 throws Exception { 1655 FsShellRun(cmd, 0, null, conf); 1656 } 1657 1658 public static void addDataNodeLayoutVersion(final int lv, final String description) 1659 throws NoSuchFieldException, IllegalAccessException { 1660 Preconditions.checkState(lv < DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION); 1661 1662 // Override {@link DataNodeLayoutVersion#CURRENT_LAYOUT_VERSION} via reflection. 1663 Field modifiersField = Field.class.getDeclaredField("modifiers"); 1664 modifiersField.setAccessible(true); 1665 Field field = DataNodeLayoutVersion.class.getField("CURRENT_LAYOUT_VERSION"); 1666 field.setAccessible(true); 1667 modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); 1668 field.setInt(null, lv); 1669 1670 // Override {@link HdfsConstants#DATANODE_LAYOUT_VERSION} 1671 field = HdfsConstants.class.getField("DATANODE_LAYOUT_VERSION"); 1672 field.setAccessible(true); 1673 modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); 1674 field.setInt(null, lv); 1675 1676 // Inject the feature into the FEATURES map. 1677 final LayoutVersion.FeatureInfo featureInfo = 1678 new LayoutVersion.FeatureInfo(lv, lv + 1, description, false); 1679 final LayoutVersion.LayoutFeature feature = 1680 new LayoutVersion.LayoutFeature() { 1681 @Override 1682 public LayoutVersion.FeatureInfo getInfo() { 1683 return featureInfo; 1684 } 1685 }; 1686 1687 // Update the FEATURES map with the new layout version. 1688 LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES, 1689 new LayoutVersion.LayoutFeature[] { feature }); 1690 } 1691 1692 /** 1693 * Wait for datanode to reach alive or dead state for waitTime given in 1694 * milliseconds. 1695 */ 1696 public static void waitForDatanodeState( 1697 final MiniDFSCluster cluster, final String nodeID, 1698 final boolean alive, int waitTime) 1699 throws TimeoutException, InterruptedException { 1700 GenericTestUtils.waitFor(new Supplier<Boolean>() { 1701 @Override 1702 public Boolean get() { 1703 FSNamesystem namesystem = cluster.getNamesystem(); 1704 final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode( 1705 namesystem, nodeID); 1706 return (dd.isAlive == alive); 1707 } 1708 }, 100, waitTime); 1709 } 1710 1711 public static void setNameNodeLogLevel(Level level) { 1712 GenericTestUtils.setLogLevel(FSNamesystem.LOG, level); 1713 GenericTestUtils.setLogLevel(BlockManager.LOG, level); 1714 GenericTestUtils.setLogLevel(LeaseManager.LOG, level); 1715 GenericTestUtils.setLogLevel(NameNode.LOG, level); 1716 GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level); 1717 GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); 1718 } 1719 1720 /** 1721 * Change the length of a block at datanode dnIndex 1722 */ 1723 public static boolean changeReplicaLength(MiniDFSCluster cluster, 1724 ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException { 1725 File blockFile = cluster.getBlockFile(dnIndex, blk); 1726 if (blockFile != null && blockFile.exists()) { 1727 RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); 1728 raFile.setLength(raFile.length()+lenDelta); 1729 raFile.close(); 1730 return true; 1731 } 1732 LOG.info("failed to change length of block " + blk); 1733 return false; 1734 } 1735 1736 /** 1737 * Set the datanode dead 1738 */ 1739 public static void setDatanodeDead(DatanodeInfo dn) { 1740 dn.setLastUpdate(0); 1741 dn.setLastUpdateMonotonic(0); 1742 } 1743 1744 /** 1745 * Update lastUpdate and lastUpdateMonotonic with some offset. 1746 */ 1747 public static void resetLastUpdatesWithOffset(DatanodeInfo dn, long offset) { 1748 dn.setLastUpdate(Time.now() + offset); 1749 dn.setLastUpdateMonotonic(Time.monotonicNow() + offset); 1750 } 1751 1752 public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock( 1753 Block block, BlockStatus blockStatus, DatanodeStorage storage) { 1754 ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1]; 1755 receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null); 1756 StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1]; 1757 reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks); 1758 return reports; 1759 } 1760 1761 /** 1762 * Adds a block to a file. 1763 * This method only manipulates NameNode 1764 * states of the file and the block without injecting data to DataNode. 1765 * It does mimic block reports. 1766 * You should disable periodical heartbeat before use this. 1767 * @param dataNodes List DataNodes to host the block 1768 * @param previous Previous block in the file 1769 * @param len block size 1770 * @return The added block 1771 */ 1772 public static Block addBlockToFile( 1773 List<DataNode> dataNodes, DistributedFileSystem fs, FSNamesystem ns, 1774 String file, INodeFile fileNode, 1775 String clientName, ExtendedBlock previous, int len) 1776 throws Exception { 1777 fs.getClient().namenode.addBlock(file, clientName, previous, null, 1778 fileNode.getId(), null); 1779 1780 final BlockInfoContiguous lastBlock = 1781 fileNode.getLastBlock(); 1782 final int groupSize = fileNode.getBlockReplication(); 1783 assert dataNodes.size() >= groupSize; 1784 // 1. RECEIVING_BLOCK IBR 1785 for (int i = 0; i < groupSize; i++) { 1786 DataNode dn = dataNodes.get(i); 1787 final Block block = new Block(lastBlock.getBlockId() + i, 0, 1788 lastBlock.getGenerationStamp()); 1789 DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); 1790 StorageReceivedDeletedBlocks[] reports = DFSTestUtil 1791 .makeReportForReceivedBlock(block, 1792 ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); 1793 for (StorageReceivedDeletedBlocks report : reports) { 1794 ns.processIncrementalBlockReport(dn.getDatanodeId(), report); 1795 } 1796 } 1797 1798 // 2. RECEIVED_BLOCK IBR 1799 for (int i = 0; i < groupSize; i++) { 1800 DataNode dn = dataNodes.get(i); 1801 final Block block = new Block(lastBlock.getBlockId() + i, 1802 len, lastBlock.getGenerationStamp()); 1803 DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); 1804 StorageReceivedDeletedBlocks[] reports = DFSTestUtil 1805 .makeReportForReceivedBlock(block, 1806 ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); 1807 for (StorageReceivedDeletedBlocks report : reports) { 1808 ns.processIncrementalBlockReport(dn.getDatanodeId(), report); 1809 } 1810 } 1811 lastBlock.setNumBytes(len); 1812 return lastBlock; 1813 } 1814 } 1815