1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hdfs;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Charsets;
23 import com.google.common.base.Joiner;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Supplier;
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Maps;
28 
29 import org.apache.commons.io.FileUtils;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.crypto.key.KeyProvider;
34 import org.apache.hadoop.fs.BlockLocation;
35 import org.apache.hadoop.fs.CacheFlag;
36 import org.apache.hadoop.fs.CommonConfigurationKeys;
37 import org.apache.hadoop.fs.CreateFlag;
38 import org.apache.hadoop.fs.FileContext;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.FileSystem.Statistics;
41 import org.apache.hadoop.fs.FSDataInputStream;
42 import org.apache.hadoop.fs.FSDataOutputStream;
43 import org.apache.hadoop.fs.FsShell;
44 import org.apache.hadoop.fs.Options.Rename;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.fs.permission.AclEntry;
47 import org.apache.hadoop.fs.permission.AclEntryScope;
48 import org.apache.hadoop.fs.permission.AclEntryType;
49 import org.apache.hadoop.fs.permission.FsAction;
50 import org.apache.hadoop.fs.permission.FsPermission;
51 import org.apache.hadoop.fs.StorageType;
52 import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
53 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
54 import org.apache.hadoop.hdfs.protocol.*;
55 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
56 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
57 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
58 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
59 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
60 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
61 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
62 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
63 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
64 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
65 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
66 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
67 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
68 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
69 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
70 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
71 import org.apache.hadoop.hdfs.server.common.StorageInfo;
72 import org.apache.hadoop.hdfs.server.datanode.DataNode;
73 import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
74 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
75 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
76 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
77 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
78 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
79 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
80 import org.apache.hadoop.hdfs.server.namenode.NameNode;
81 import org.apache.hadoop.hdfs.server.namenode.ha
82         .ConfiguredFailoverProxyProvider;
83 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
84 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
85 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
86 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
87 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
88 import org.apache.hadoop.hdfs.tools.DFSAdmin;
89 import org.apache.hadoop.io.IOUtils;
90 import org.apache.hadoop.io.nativeio.NativeIO;
91 import org.apache.hadoop.net.NetUtils;
92 import org.apache.hadoop.net.unix.DomainSocket;
93 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
94 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
95 import org.apache.hadoop.security.UserGroupInformation;
96 import org.apache.hadoop.security.token.Token;
97 import org.apache.hadoop.test.GenericTestUtils;
98 import org.apache.hadoop.util.StringUtils;
99 import org.apache.hadoop.util.Time;
100 import org.apache.hadoop.util.Tool;
101 import org.apache.hadoop.util.VersionInfo;
102 import org.apache.log4j.Level;
103 import org.junit.Assume;
104 import org.mockito.internal.util.reflection.Whitebox;
105 
106 import java.io.*;
107 import java.lang.reflect.Field;
108 import java.lang.reflect.Modifier;
109 import java.net.*;
110 import java.nio.ByteBuffer;
111 import java.security.NoSuchAlgorithmException;
112 import java.security.PrivilegedExceptionAction;
113 import java.util.*;
114 import java.util.concurrent.TimeoutException;
115 
116 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
117 import static org.apache.hadoop.fs.CreateFlag.CREATE;
118 import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
119 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
120 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
121 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
122 import static org.junit.Assert.assertEquals;
123 import static org.junit.Assert.assertTrue;
124 import static org.junit.Assert.fail;
125 
126 /** Utilities for HDFS tests */
127 public class DFSTestUtil {
128 
129   private static final Log LOG = LogFactory.getLog(DFSTestUtil.class);
130 
131   private static final Random gen = new Random();
132   private static final String[] dirNames = {
133     "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
134   };
135 
136   private final int maxLevels;
137   private final int maxSize;
138   private final int minSize;
139   private final int nFiles;
140   private MyFile[] files;
141 
142   /** Creates a new instance of DFSTestUtil
143    *
144    * @param nFiles Number of files to be created
145    * @param maxLevels Maximum number of directory levels
146    * @param maxSize Maximum size for file
147    * @param minSize Minimum size for file
148    */
DFSTestUtil(int nFiles, int maxLevels, int maxSize, int minSize)149   private DFSTestUtil(int nFiles, int maxLevels, int maxSize, int minSize) {
150     this.nFiles = nFiles;
151     this.maxLevels = maxLevels;
152     this.maxSize = maxSize;
153     this.minSize = minSize;
154   }
155 
156   /** Creates a new instance of DFSTestUtil
157    *
158    * @param testName Name of the test from where this utility is used
159    * @param nFiles Number of files to be created
160    * @param maxLevels Maximum number of directory levels
161    * @param maxSize Maximum size for file
162    * @param minSize Minimum size for file
163    */
DFSTestUtil(String testName, int nFiles, int maxLevels, int maxSize, int minSize)164   public DFSTestUtil(String testName, int nFiles, int maxLevels, int maxSize,
165       int minSize) {
166     this.nFiles = nFiles;
167     this.maxLevels = maxLevels;
168     this.maxSize = maxSize;
169     this.minSize = minSize;
170   }
171 
172   /**
173    * when formatting a namenode - we must provide clusterid.
174    * @param conf
175    * @throws IOException
176    */
formatNameNode(Configuration conf)177   public static void formatNameNode(Configuration conf) throws IOException {
178     String clusterId = StartupOption.FORMAT.getClusterId();
179     if(clusterId == null || clusterId.isEmpty())
180       StartupOption.FORMAT.setClusterId("testClusterID");
181     // Use a copy of conf as it can be altered by namenode during format.
182     NameNode.format(new Configuration(conf));
183   }
184 
185   /**
186    * Create a new HA-enabled configuration.
187    */
newHAConfiguration(final String logicalName)188   public static Configuration newHAConfiguration(final String logicalName) {
189     Configuration conf = new Configuration();
190     addHAConfiguration(conf, logicalName);
191     return conf;
192   }
193 
194   /**
195    * Add a new HA configuration.
196    */
addHAConfiguration(Configuration conf, final String logicalName)197   public static void addHAConfiguration(Configuration conf,
198       final String logicalName) {
199     String nsIds = conf.get(DFSConfigKeys.DFS_NAMESERVICES);
200     if (nsIds == null) {
201       conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
202     } else { // append the nsid
203       conf.set(DFSConfigKeys.DFS_NAMESERVICES, nsIds + "," + logicalName);
204     }
205     conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
206             logicalName), "nn1,nn2");
207     conf.set(DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "" +
208             "." + logicalName,
209             ConfiguredFailoverProxyProvider.class.getName());
210     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
211   }
212 
setFakeHttpAddresses(Configuration conf, final String logicalName)213   public static void setFakeHttpAddresses(Configuration conf,
214       final String logicalName) {
215     conf.set(DFSUtil.addKeySuffixes(
216         DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
217         logicalName, "nn1"), "127.0.0.1:12345");
218     conf.set(DFSUtil.addKeySuffixes(
219         DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
220         logicalName, "nn2"), "127.0.0.1:12346");
221   }
222 
setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog)223   public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
224     Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog);
225     Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
226   }
227 
228   /** class MyFile contains enough information to recreate the contents of
229    * a single file.
230    */
231   private class MyFile {
232 
233     private String name = "";
234     private final int size;
235     private final long seed;
236 
MyFile()237     MyFile() {
238       int nLevels = gen.nextInt(maxLevels);
239       if (nLevels != 0) {
240         int[] levels = new int[nLevels];
241         for (int idx = 0; idx < nLevels; idx++) {
242           levels[idx] = gen.nextInt(10);
243         }
244         StringBuffer sb = new StringBuffer();
245         for (int idx = 0; idx < nLevels; idx++) {
246           sb.append(dirNames[levels[idx]]);
247           sb.append("/");
248         }
249         name = sb.toString();
250       }
251       long fidx = -1;
252       while (fidx < 0) { fidx = gen.nextLong(); }
253       name = name + Long.toString(fidx);
254       size = minSize + gen.nextInt(maxSize - minSize);
255       seed = gen.nextLong();
256     }
257 
getName()258     String getName() { return name; }
getSize()259     int getSize() { return size; }
getSeed()260     long getSeed() { return seed; }
261   }
262 
createFiles(FileSystem fs, String topdir)263   public void createFiles(FileSystem fs, String topdir) throws IOException {
264     createFiles(fs, topdir, (short)3);
265   }
266 
267   /** create nFiles with random names and directory hierarchies
268    *  with random (but reproducible) data in them.
269    */
createFiles(FileSystem fs, String topdir, short replicationFactor)270   public void createFiles(FileSystem fs, String topdir,
271                    short replicationFactor) throws IOException {
272     files = new MyFile[nFiles];
273 
274     for (int idx = 0; idx < nFiles; idx++) {
275       files[idx] = new MyFile();
276     }
277 
278     Path root = new Path(topdir);
279 
280     for (int idx = 0; idx < nFiles; idx++) {
281       createFile(fs, new Path(root, files[idx].getName()), files[idx].getSize(),
282           replicationFactor, files[idx].getSeed());
283     }
284   }
285 
readFile(FileSystem fs, Path fileName)286   public static String readFile(FileSystem fs, Path fileName)
287       throws IOException {
288     byte buf[] = readFileBuffer(fs, fileName);
289 	return new String(buf, 0, buf.length);
290   }
291 
readFileBuffer(FileSystem fs, Path fileName)292   public static byte[] readFileBuffer(FileSystem fs, Path fileName)
293       throws IOException {
294     ByteArrayOutputStream os = new ByteArrayOutputStream();
295     try {
296       FSDataInputStream in = fs.open(fileName);
297       try {
298         IOUtils.copyBytes(in, os, 1024, true);
299         return os.toByteArray();
300       } finally {
301         in.close();
302       }
303     } finally {
304       os.close();
305     }
306   }
307 
createFile(FileSystem fs, Path fileName, long fileLen, short replFactor, long seed)308   public static void createFile(FileSystem fs, Path fileName, long fileLen,
309       short replFactor, long seed) throws IOException {
310     if (!fs.mkdirs(fileName.getParent())) {
311       throw new IOException("Mkdirs failed to create " +
312                             fileName.getParent().toString());
313     }
314     FSDataOutputStream out = null;
315     try {
316       out = fs.create(fileName, replFactor);
317       byte[] toWrite = new byte[1024];
318       Random rb = new Random(seed);
319       long bytesToWrite = fileLen;
320       while (bytesToWrite>0) {
321         rb.nextBytes(toWrite);
322         int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
323 
324         out.write(toWrite, 0, bytesToWriteNext);
325         bytesToWrite -= bytesToWriteNext;
326       }
327       out.close();
328       out = null;
329     } finally {
330       IOUtils.closeStream(out);
331     }
332   }
333 
createFile(FileSystem fs, Path fileName, int bufferLen, long fileLen, long blockSize, short replFactor, long seed)334   public static void createFile(FileSystem fs, Path fileName, int bufferLen,
335                                 long fileLen, long blockSize, short replFactor, long seed)
336       throws IOException {
337     createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor,
338       seed, false);
339   }
340 
createFile(FileSystem fs, Path fileName, boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, short replFactor, long seed, boolean flush)341   public static void createFile(FileSystem fs, Path fileName,
342       boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
343       short replFactor, long seed, boolean flush) throws IOException {
344         createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize,
345           replFactor, seed, flush, null);
346   }
347 
createFile(FileSystem fs, Path fileName, boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, short replFactor, long seed, boolean flush, InetSocketAddress[] favoredNodes)348   public static void createFile(FileSystem fs, Path fileName,
349       boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
350       short replFactor, long seed, boolean flush,
351       InetSocketAddress[] favoredNodes) throws IOException {
352   assert bufferLen > 0;
353   if (!fs.mkdirs(fileName.getParent())) {
354       throw new IOException("Mkdirs failed to create " +
355                 fileName.getParent().toString());
356   }
357   FSDataOutputStream out = null;
358   EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
359   createFlags.add(OVERWRITE);
360   if (isLazyPersist) {
361     createFlags.add(LAZY_PERSIST);
362   }
363   try {
364     if (favoredNodes == null) {
365       out = fs.create(
366         fileName,
367         FsPermission.getFileDefault(),
368         createFlags,
369         fs.getConf().getInt(
370           CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
371         replFactor, blockSize, null);
372     } else {
373       out = ((DistributedFileSystem) fs).create(fileName,
374         FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
375         null, favoredNodes);
376     }
377       if (fileLen > 0) {
378         byte[] toWrite = new byte[bufferLen];
379         Random rb = new Random(seed);
380         long bytesToWrite = fileLen;
381         while (bytesToWrite>0) {
382           rb.nextBytes(toWrite);
383           int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
384             : (int) bytesToWrite;
385 
386             out.write(toWrite, 0, bytesToWriteNext);
387             bytesToWrite -= bytesToWriteNext;
388         }
389         if (flush) {
390           out.hsync();
391         }
392       }
393     } finally {
394       if (out != null) {
395         out.close();
396       }
397     }
398   }
399 
calculateFileContentsFromSeed(long seed, int length)400   public static byte[] calculateFileContentsFromSeed(long seed, int length) {
401     Random rb = new Random(seed);
402     byte val[] = new byte[length];
403     rb.nextBytes(val);
404     return val;
405   }
406 
407   /** check if the files have been copied correctly. */
checkFiles(FileSystem fs, String topdir)408   public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
409     Path root = new Path(topdir);
410 
411     for (int idx = 0; idx < nFiles; idx++) {
412       Path fPath = new Path(root, files[idx].getName());
413       FSDataInputStream in = fs.open(fPath);
414       byte[] toRead = new byte[files[idx].getSize()];
415       byte[] toCompare = new byte[files[idx].getSize()];
416       Random rb = new Random(files[idx].getSeed());
417       rb.nextBytes(toCompare);
418       in.readFully(0, toRead);
419       in.close();
420       for (int i = 0; i < toRead.length; i++) {
421         if (toRead[i] != toCompare[i]) {
422           return false;
423         }
424       }
425       toRead = null;
426       toCompare = null;
427     }
428 
429     return true;
430   }
431 
setReplication(FileSystem fs, String topdir, short value)432   void setReplication(FileSystem fs, String topdir, short value)
433                                               throws IOException {
434     Path root = new Path(topdir);
435     for (int idx = 0; idx < nFiles; idx++) {
436       Path fPath = new Path(root, files[idx].getName());
437       fs.setReplication(fPath, value);
438     }
439   }
440 
441   /*
442    * Waits for the replication factor of all files to reach the
443    * specified target.
444    */
waitReplication(FileSystem fs, String topdir, short value)445   public void waitReplication(FileSystem fs, String topdir, short value)
446       throws IOException, InterruptedException, TimeoutException {
447     Path root = new Path(topdir);
448 
449     /** wait for the replication factor to settle down */
450     for (int idx = 0; idx < nFiles; idx++) {
451       waitReplication(fs, new Path(root, files[idx].getName()), value);
452     }
453   }
454 
455   /*
456    * Check if the given block in the given file is corrupt.
457    */
allBlockReplicasCorrupt(MiniDFSCluster cluster, Path file, int blockNo)458   public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
459       Path file, int blockNo) throws IOException {
460     DFSClient client = new DFSClient(new InetSocketAddress("localhost",
461         cluster.getNameNodePort()), cluster.getConfiguration(0));
462     LocatedBlocks blocks;
463     try {
464        blocks = client.getNamenode().getBlockLocations(
465            file.toString(), 0, Long.MAX_VALUE);
466     } finally {
467       client.close();
468     }
469     return blocks.get(blockNo).isCorrupt();
470   }
471 
472   /*
473    * Wait up to 20s for the given block to be replicated across
474    * the requested number of racks, with the requested number of
475    * replicas, and the requested number of replicas still needed.
476    */
waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, int racks, int replicas, int neededReplicas)477   public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
478       int racks, int replicas, int neededReplicas)
479       throws TimeoutException, InterruptedException {
480     int curRacks = 0;
481     int curReplicas = 0;
482     int curNeededReplicas = 0;
483     int count = 0;
484     final int ATTEMPTS = 20;
485 
486     do {
487       Thread.sleep(1000);
488       int[] r = BlockManagerTestUtil.getReplicaInfo(cluster.getNamesystem(),
489           b.getLocalBlock());
490       curRacks = r[0];
491       curReplicas = r[1];
492       curNeededReplicas = r[2];
493       count++;
494     } while ((curRacks != racks ||
495               curReplicas != replicas ||
496               curNeededReplicas != neededReplicas) && count < ATTEMPTS);
497 
498     if (count == ATTEMPTS) {
499       throw new TimeoutException("Timed out waiting for replication."
500           + " Needed replicas = "+neededReplicas
501           + " Cur needed replicas = "+curNeededReplicas
502           + " Replicas = "+replicas+" Cur replicas = "+curReplicas
503           + " Racks = "+racks+" Cur racks = "+curRacks);
504     }
505   }
506 
507   /**
508    * Keep accessing the given file until the namenode reports that the
509    * given block in the file contains the given number of corrupt replicas.
510    */
waitCorruptReplicas(FileSystem fs, FSNamesystem ns, Path file, ExtendedBlock b, int corruptRepls)511   public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
512       Path file, ExtendedBlock b, int corruptRepls)
513       throws TimeoutException, InterruptedException {
514     int count = 0;
515     final int ATTEMPTS = 50;
516     int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
517     while (repls != corruptRepls && count < ATTEMPTS) {
518       try {
519         IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
520             512, true);
521       } catch (IOException e) {
522         // Swallow exceptions
523       }
524       System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
525       count++;
526       // check more often so corrupt block reports are not easily missed
527       for (int i = 0; i < 10; i++) {
528         repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
529         Thread.sleep(100);
530         if (repls == corruptRepls) {
531           break;
532         }
533       }
534     }
535     if (count == ATTEMPTS) {
536       throw new TimeoutException("Timed out waiting for corrupt replicas."
537           + " Waiting for "+corruptRepls+", but only found "+repls);
538     }
539   }
540 
541   /*
542    * Wait up to 20s for the given DN (IP:port) to be decommissioned
543    */
waitForDecommission(FileSystem fs, String name)544   public static void waitForDecommission(FileSystem fs, String name)
545       throws IOException, InterruptedException, TimeoutException {
546     DatanodeInfo dn = null;
547     int count = 0;
548     final int ATTEMPTS = 20;
549 
550     do {
551       Thread.sleep(1000);
552       DistributedFileSystem dfs = (DistributedFileSystem)fs;
553       for (DatanodeInfo info : dfs.getDataNodeStats()) {
554         if (name.equals(info.getXferAddr())) {
555           dn = info;
556         }
557       }
558       count++;
559     } while ((dn == null ||
560               dn.isDecommissionInProgress() ||
561               !dn.isDecommissioned()) && count < ATTEMPTS);
562 
563     if (count == ATTEMPTS) {
564       throw new TimeoutException("Timed out waiting for datanode "
565           + name + " to decommission.");
566     }
567   }
568 
569   /*
570    * Returns the index of the first datanode which has a copy
571    * of the given block, or -1 if no such datanode exists.
572    */
firstDnWithBlock(MiniDFSCluster cluster, ExtendedBlock b)573   public static int firstDnWithBlock(MiniDFSCluster cluster, ExtendedBlock b)
574       throws IOException {
575     int numDatanodes = cluster.getDataNodes().size();
576     for (int i = 0; i < numDatanodes; i++) {
577       String blockContent = cluster.readBlockOnDataNode(i, b);
578       if (blockContent != null) {
579         return i;
580       }
581     }
582     return -1;
583   }
584 
585   /*
586    * Return the total capacity of all live DNs.
587    */
getLiveDatanodeCapacity(DatanodeManager dm)588   public static long getLiveDatanodeCapacity(DatanodeManager dm) {
589     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
590     dm.fetchDatanodes(live, null, false);
591     long capacity = 0;
592     for (final DatanodeDescriptor dn : live) {
593       capacity += dn.getCapacity();
594     }
595     return capacity;
596   }
597 
598   /*
599    * Return the capacity of the given live DN.
600    */
getDatanodeCapacity(DatanodeManager dm, int index)601   public static long getDatanodeCapacity(DatanodeManager dm, int index) {
602     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
603     dm.fetchDatanodes(live, null, false);
604     return live.get(index).getCapacity();
605   }
606 
607   /*
608    * Wait for the given # live/dead DNs, total capacity, and # vol failures.
609    */
waitForDatanodeStatus(DatanodeManager dm, int expectedLive, int expectedDead, long expectedVolFails, long expectedTotalCapacity, long timeout)610   public static void waitForDatanodeStatus(DatanodeManager dm, int expectedLive,
611       int expectedDead, long expectedVolFails, long expectedTotalCapacity,
612       long timeout) throws InterruptedException, TimeoutException {
613     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
614     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
615     final int ATTEMPTS = 10;
616     int count = 0;
617     long currTotalCapacity = 0;
618     int volFails = 0;
619 
620     do {
621       Thread.sleep(timeout);
622       live.clear();
623       dead.clear();
624       dm.fetchDatanodes(live, dead, false);
625       currTotalCapacity = 0;
626       volFails = 0;
627       for (final DatanodeDescriptor dd : live) {
628         currTotalCapacity += dd.getCapacity();
629         volFails += dd.getVolumeFailures();
630       }
631       count++;
632     } while ((expectedLive != live.size() ||
633               expectedDead != dead.size() ||
634               expectedTotalCapacity != currTotalCapacity ||
635               expectedVolFails != volFails)
636              && count < ATTEMPTS);
637 
638     if (count == ATTEMPTS) {
639       throw new TimeoutException("Timed out waiting for capacity."
640           + " Live = "+live.size()+" Expected = "+expectedLive
641           + " Dead = "+dead.size()+" Expected = "+expectedDead
642           + " Total capacity = "+currTotalCapacity
643           + " Expected = "+expectedTotalCapacity
644           + " Vol Fails = "+volFails+" Expected = "+expectedVolFails);
645     }
646   }
647 
648   /*
649    * Wait for the given DN to consider itself dead.
650    */
waitForDatanodeDeath(DataNode dn)651   public static void waitForDatanodeDeath(DataNode dn)
652       throws InterruptedException, TimeoutException {
653     final int ATTEMPTS = 10;
654     int count = 0;
655     do {
656       Thread.sleep(1000);
657       count++;
658     } while (dn.isDatanodeUp() && count < ATTEMPTS);
659 
660     if (count == ATTEMPTS) {
661       throw new TimeoutException("Timed out waiting for DN to die");
662     }
663   }
664 
665   /** return list of filenames created as part of createFiles */
getFileNames(String topDir)666   public String[] getFileNames(String topDir) {
667     if (nFiles == 0)
668       return new String[]{};
669     else {
670       String[] fileNames =  new String[nFiles];
671       for (int idx=0; idx < nFiles; idx++) {
672         fileNames[idx] = topDir + "/" + files[idx].getName();
673       }
674       return fileNames;
675     }
676   }
677 
678   /**
679    * Wait for the given file to reach the given replication factor.
680    * @throws TimeoutException if we fail to sufficiently replicate the file
681    */
waitReplication(FileSystem fs, Path fileName, short replFactor)682   public static void waitReplication(FileSystem fs, Path fileName, short replFactor)
683       throws IOException, InterruptedException, TimeoutException {
684     boolean correctReplFactor;
685     final int ATTEMPTS = 40;
686     int count = 0;
687 
688     do {
689       correctReplFactor = true;
690       BlockLocation locs[] = fs.getFileBlockLocations(
691         fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
692       count++;
693       for (int j = 0; j < locs.length; j++) {
694         String[] hostnames = locs[j].getNames();
695         if (hostnames.length != replFactor) {
696           correctReplFactor = false;
697           System.out.println("Block " + j + " of file " + fileName
698               + " has replication factor " + hostnames.length
699               + " (desired " + replFactor + "); locations "
700               + Joiner.on(' ').join(hostnames));
701           Thread.sleep(1000);
702           break;
703         }
704       }
705       if (correctReplFactor) {
706         System.out.println("All blocks of file " + fileName
707             + " verified to have replication factor " + replFactor);
708       }
709     } while (!correctReplFactor && count < ATTEMPTS);
710 
711     if (count == ATTEMPTS) {
712       throw new TimeoutException("Timed out waiting for " + fileName +
713           " to reach " + replFactor + " replicas");
714     }
715   }
716 
717   /** delete directory and everything underneath it.*/
cleanup(FileSystem fs, String topdir)718   public void cleanup(FileSystem fs, String topdir) throws IOException {
719     Path root = new Path(topdir);
720     fs.delete(root, true);
721     files = null;
722   }
723 
getFirstBlock(FileSystem fs, Path path)724   public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
725     HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
726     try {
727       in.readByte();
728       return in.getCurrentBlock();
729     } finally {
730       in.close();
731     }
732   }
733 
getAllBlocks(FSDataInputStream in)734   public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
735       throws IOException {
736     return ((HdfsDataInputStream) in).getAllBlocks();
737   }
738 
getAllBlocks(FileSystem fs, Path path)739   public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path)
740       throws IOException {
741     HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
742     return in.getAllBlocks();
743   }
744 
getBlockToken( FSDataOutputStream out)745   public static Token<BlockTokenIdentifier> getBlockToken(
746       FSDataOutputStream out) {
747     return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();
748   }
749 
readFile(File f)750   public static String readFile(File f) throws IOException {
751     StringBuilder b = new StringBuilder();
752     BufferedReader in = new BufferedReader(new FileReader(f));
753     for(int c; (c = in.read()) != -1; b.append((char)c));
754     in.close();
755     return b.toString();
756   }
757 
758   /* Write the given string to the given file */
writeFile(FileSystem fs, Path p, String s)759   public static void writeFile(FileSystem fs, Path p, String s)
760       throws IOException {
761     if (fs.exists(p)) {
762       fs.delete(p, true);
763     }
764     InputStream is = new ByteArrayInputStream(s.getBytes());
765     FSDataOutputStream os = fs.create(p);
766     IOUtils.copyBytes(is, os, s.length(), true);
767   }
768 
769   /* Append the given string to the given file */
appendFile(FileSystem fs, Path p, String s)770   public static void appendFile(FileSystem fs, Path p, String s)
771       throws IOException {
772     assert fs.exists(p);
773     InputStream is = new ByteArrayInputStream(s.getBytes());
774     FSDataOutputStream os = fs.append(p);
775     IOUtils.copyBytes(is, os, s.length(), true);
776   }
777 
778   /**
779    * Append specified length of bytes to a given file
780    * @param fs The file system
781    * @param p Path of the file to append
782    * @param length Length of bytes to append to the file
783    * @throws IOException
784    */
appendFile(FileSystem fs, Path p, int length)785   public static void appendFile(FileSystem fs, Path p, int length)
786       throws IOException {
787     assert fs.exists(p);
788     assert length >= 0;
789     byte[] toAppend = new byte[length];
790     Random random = new Random();
791     random.nextBytes(toAppend);
792     FSDataOutputStream out = fs.append(p);
793     out.write(toAppend);
794     out.close();
795   }
796 
797   /**
798    * @return url content as string (UTF-8 encoding assumed)
799    */
urlGet(URL url)800   public static String urlGet(URL url) throws IOException {
801     return new String(urlGetBytes(url), Charsets.UTF_8);
802   }
803 
804   /**
805    * @return URL contents as a byte array
806    */
urlGetBytes(URL url)807   public static byte[] urlGetBytes(URL url) throws IOException {
808     URLConnection conn = url.openConnection();
809     HttpURLConnection hc = (HttpURLConnection)conn;
810 
811     assertEquals(HttpURLConnection.HTTP_OK, hc.getResponseCode());
812     ByteArrayOutputStream out = new ByteArrayOutputStream();
813     IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
814     return out.toByteArray();
815   }
816 
817   /**
818    * mock class to get group mapping for fake users
819    *
820    */
821   static class MockUnixGroupsMapping extends ShellBasedUnixGroupsMapping {
822     static Map<String, String []> fakeUser2GroupsMap;
823     private static final List<String> defaultGroups;
824     static {
825       defaultGroups = new ArrayList<String>(1);
826       defaultGroups.add("supergroup");
827       fakeUser2GroupsMap = new HashMap<String, String[]>();
828     }
829 
830     @Override
getGroups(String user)831     public List<String> getGroups(String user) throws IOException {
832       boolean found = false;
833 
834       // check to see if this is one of fake users
835       List<String> l = new ArrayList<String>();
836       for(String u : fakeUser2GroupsMap.keySet()) {
837         if(user.equals(u)) {
838           found = true;
839           for(String gr : fakeUser2GroupsMap.get(u)) {
840             l.add(gr);
841           }
842         }
843       }
844 
845       // default
846       if(!found) {
847         l =  super.getGroups(user);
848         if(l.size() == 0) {
849           System.out.println("failed to get real group for " + user +
850               "; using default");
851           return defaultGroups;
852         }
853       }
854       return l;
855     }
856   }
857 
858   /**
859    * update the configuration with fake class for mapping user to groups
860    * @param conf
861    * @param map - user to groups mapping
862    */
updateConfWithFakeGroupMapping(Configuration conf, Map<String, String []> map)863   static public void updateConfWithFakeGroupMapping
864     (Configuration conf, Map<String, String []> map) {
865     if(map!=null) {
866       MockUnixGroupsMapping.fakeUser2GroupsMap = map;
867     }
868 
869     // fake mapping user to groups
870     conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
871         DFSTestUtil.MockUnixGroupsMapping.class,
872         ShellBasedUnixGroupsMapping.class);
873 
874   }
875 
876   /**
877    * Get a FileSystem instance as specified user in a doAs block.
878    */
getFileSystemAs(UserGroupInformation ugi, final Configuration conf)879   static public FileSystem getFileSystemAs(UserGroupInformation ugi,
880       final Configuration conf) throws IOException {
881     try {
882       return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
883         @Override
884         public FileSystem run() throws Exception {
885           return FileSystem.get(conf);
886         }
887       });
888     } catch (InterruptedException e) {
889       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
890     }
891   }
892 
893   public static byte[] generateSequentialBytes(int start, int length) {
894     byte[] result = new byte[length];
895 
896     for (int i = 0; i < length; i++) {
897       result[i] = (byte) ((start + i) % 127);
898     }
899 
900     return result;
901   }
902 
903   public static Statistics getStatistics(FileSystem fs) {
904     return FileSystem.getStatistics(fs.getUri().getScheme(), fs.getClass());
905   }
906 
907   /**
908    * Load file into byte[]
909    */
910   public static byte[] loadFile(String filename) throws IOException {
911     File file = new File(filename);
912     DataInputStream in = new DataInputStream(new FileInputStream(file));
913     byte[] content = new byte[(int)file.length()];
914     try {
915       in.readFully(content);
916     } finally {
917       IOUtils.cleanup(LOG, in);
918     }
919     return content;
920   }
921 
922   /** For {@link TestTransferRbw} */
923   public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
924       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
925     assertEquals(2, datanodes.length);
926     final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
927         datanodes.length, dfsClient);
928     final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
929     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
930         NetUtils.getOutputStream(s, writeTimeout),
931         HdfsConstants.SMALL_BUFFER_SIZE));
932     final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
933 
934     // send the request
935     new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
936         dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
937         new StorageType[]{StorageType.DEFAULT});
938     out.flush();
939 
940     return BlockOpResponseProto.parseDelimitedFrom(in);
941   }
942 
943   public static void setFederatedConfiguration(MiniDFSCluster cluster,
944       Configuration conf) {
945     Set<String> nameservices = new HashSet<String>();
946     for (NameNodeInfo info : cluster.getNameNodeInfos()) {
947       assert info.nameserviceId != null;
948       nameservices.add(info.nameserviceId);
949       conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
950           info.nameserviceId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME,
951               info.nameNode.getNameNodeAddress()).toString());
952       conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
953           info.nameserviceId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME,
954               info.nameNode.getNameNodeAddress()).toString());
955     }
956     conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",")
957         .join(nameservices));
958   }
959 
960   public static void setFederatedHAConfiguration(MiniDFSCluster cluster,
961       Configuration conf) {
962     Map<String, List<String>> nameservices = Maps.newHashMap();
963     for (NameNodeInfo info : cluster.getNameNodeInfos()) {
964       Preconditions.checkState(info.nameserviceId != null);
965       List<String> nns = nameservices.get(info.nameserviceId);
966       if (nns == null) {
967         nns = Lists.newArrayList();
968         nameservices.put(info.nameserviceId, nns);
969       }
970       nns.add(info.nnId);
971 
972       conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
973           info.nameserviceId, info.nnId),
974           DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME,
975           info.nameNode.getNameNodeAddress()).toString());
976       conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
977           info.nameserviceId, info.nnId),
978           DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME,
979           info.nameNode.getNameNodeAddress()).toString());
980     }
981     for (Map.Entry<String, List<String>> entry : nameservices.entrySet()) {
982       conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX,
983           entry.getKey()), Joiner.on(",").join(entry.getValue()));
984       conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + entry
985           .getKey(), ConfiguredFailoverProxyProvider.class.getName());
986     }
987     conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",")
988         .join(nameservices.keySet()));
989   }
990 
991   private static DatanodeID getDatanodeID(String ipAddr) {
992     return new DatanodeID(ipAddr, "localhost",
993         UUID.randomUUID().toString(),
994         DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
995         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
996         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
997         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
998   }
999 
1000   public static DatanodeID getLocalDatanodeID() {
1001     return getDatanodeID("127.0.0.1");
1002   }
1003 
1004   public static DatanodeID getLocalDatanodeID(int port) {
1005     return new DatanodeID("127.0.0.1", "localhost",
1006         UUID.randomUUID().toString(),
1007         port, port, port, port);
1008   }
1009 
1010   public static DatanodeDescriptor getLocalDatanodeDescriptor() {
1011     return new DatanodeDescriptor(getLocalDatanodeID());
1012   }
1013 
1014   public static DatanodeInfo getLocalDatanodeInfo() {
1015     return new DatanodeInfo(getLocalDatanodeID());
1016   }
1017 
1018   public static DatanodeInfo getDatanodeInfo(String ipAddr) {
1019     return new DatanodeInfo(getDatanodeID(ipAddr));
1020   }
1021 
1022   public static DatanodeInfo getLocalDatanodeInfo(int port) {
1023     return new DatanodeInfo(getLocalDatanodeID(port));
1024   }
1025 
1026   public static DatanodeInfo getDatanodeInfo(String ipAddr,
1027       String host, int port) {
1028     return new DatanodeInfo(new DatanodeID(ipAddr, host,
1029         UUID.randomUUID().toString(), port,
1030         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
1031         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
1032         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT));
1033   }
1034 
1035   public static DatanodeInfo getLocalDatanodeInfo(String ipAddr,
1036       String hostname, AdminStates adminState) {
1037     return new DatanodeInfo(ipAddr, hostname, "",
1038         DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
1039         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
1040         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
1041         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
1042         1l, 2l, 3l, 4l, 0l, 0l, 0l, 5, 6, "local", adminState);
1043   }
1044 
1045   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
1046       String rackLocation) {
1047     return getDatanodeDescriptor(ipAddr, DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
1048         rackLocation);
1049   }
1050 
1051   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
1052       String rackLocation, String hostname) {
1053     return getDatanodeDescriptor(ipAddr,
1054         DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
1055   }
1056 
1057   public static DatanodeStorageInfo createDatanodeStorageInfo(
1058       String storageID, String ip) {
1059     return createDatanodeStorageInfo(storageID, ip, "defaultRack", "host");
1060   }
1061 
1062   public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) {
1063     return createDatanodeStorageInfos(racks, null);
1064   }
1065 
1066   public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks, String[] hostnames) {
1067     return createDatanodeStorageInfos(racks.length, racks, hostnames);
1068   }
1069 
1070   public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) {
1071     return createDatanodeStorageInfos(n, null, null);
1072   }
1073 
1074   public static DatanodeStorageInfo[] createDatanodeStorageInfos(
1075       int n, String[] racks, String[] hostnames) {
1076     return createDatanodeStorageInfos(n, racks, hostnames, null);
1077   }
1078 
1079   public static DatanodeStorageInfo[] createDatanodeStorageInfos(
1080       int n, String[] racks, String[] hostnames, StorageType[] types) {
1081     DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
1082     for(int i = storages.length; i > 0; ) {
1083       final String storageID = "s" + i;
1084       final String ip = i + "." + i + "." + i + "." + i;
1085       i--;
1086       final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack";
1087       final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host";
1088       final StorageType type = (types != null && i < types.length) ? types[i]
1089           : StorageType.DEFAULT;
1090       storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname,
1091           type);
1092     }
1093     return storages;
1094   }
1095 
1096   public static DatanodeStorageInfo createDatanodeStorageInfo(
1097       String storageID, String ip, String rack, String hostname) {
1098     return createDatanodeStorageInfo(storageID, ip, rack, hostname,
1099         StorageType.DEFAULT);
1100   }
1101 
1102   public static DatanodeStorageInfo createDatanodeStorageInfo(
1103       String storageID, String ip, String rack, String hostname,
1104       StorageType type) {
1105     final DatanodeStorage storage = new DatanodeStorage(storageID,
1106         DatanodeStorage.State.NORMAL, type);
1107     final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(
1108         ip, rack, storage, hostname);
1109     return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
1110   }
1111 
1112   public static DatanodeDescriptor[] toDatanodeDescriptor(
1113       DatanodeStorageInfo[] storages) {
1114     DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length];
1115     for(int i = 0; i < datanodes.length; i++) {
1116       datanodes[i] = storages[i].getDatanodeDescriptor();
1117     }
1118     return datanodes;
1119   }
1120 
1121   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
1122       int port, String rackLocation, String hostname) {
1123     DatanodeID dnId = new DatanodeID(ipAddr, hostname,
1124         UUID.randomUUID().toString(), port,
1125         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
1126         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
1127         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
1128     return new DatanodeDescriptor(dnId, rackLocation);
1129   }
1130 
1131   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
1132       int port, String rackLocation) {
1133     return getDatanodeDescriptor(ipAddr, port, rackLocation, "host");
1134   }
1135 
1136   public static DatanodeRegistration getLocalDatanodeRegistration() {
1137     return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo(
1138         NodeType.DATA_NODE), new ExportedBlockKeys(), VersionInfo.getVersion());
1139   }
1140 
1141   /** Copy one file's contents into the other **/
1142   public static void copyFile(File src, File dest) throws IOException {
1143     FileUtils.copyFile(src, dest);
1144   }
1145 
1146   public static class Builder {
1147     private int maxLevels = 3;
1148     private int maxSize = 8*1024;
1149     private int minSize = 1;
1150     private int nFiles = 1;
1151 
1152     public Builder() {
1153     }
1154 
1155     public Builder setName(String string) {
1156       return this;
1157     }
1158 
1159     public Builder setNumFiles(int nFiles) {
1160       this.nFiles = nFiles;
1161       return this;
1162     }
1163 
1164     public Builder setMaxLevels(int maxLevels) {
1165       this.maxLevels = maxLevels;
1166       return this;
1167     }
1168 
1169     public Builder setMaxSize(int maxSize) {
1170       this.maxSize = maxSize;
1171       return this;
1172     }
1173 
1174     public Builder setMinSize(int minSize) {
1175       this.minSize = minSize;
1176       return this;
1177     }
1178 
1179     public DFSTestUtil build() {
1180       return new DFSTestUtil(nFiles, maxLevels, maxSize, minSize);
1181     }
1182   }
1183 
1184   /**
1185    * Run a set of operations and generate all edit logs
1186    */
1187   public static void runOperations(MiniDFSCluster cluster,
1188       DistributedFileSystem filesystem, Configuration conf, long blockSize,
1189       int nnIndex) throws IOException {
1190     // create FileContext for rename2
1191     FileContext fc = FileContext.getFileContext(cluster.getURI(0), conf);
1192 
1193     // OP_ADD 0
1194     final Path pathFileCreate = new Path("/file_create");
1195     FSDataOutputStream s = filesystem.create(pathFileCreate);
1196     // OP_CLOSE 9
1197     s.close();
1198     // OP_APPEND 47
1199     FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null);
1200     s2.close();
1201     // OP_SET_STORAGE_POLICY 45
1202     filesystem.setStoragePolicy(pathFileCreate,
1203         HdfsConstants.HOT_STORAGE_POLICY_NAME);
1204     // OP_RENAME_OLD 1
1205     final Path pathFileMoved = new Path("/file_moved");
1206     filesystem.rename(pathFileCreate, pathFileMoved);
1207     // OP_DELETE 2
1208     filesystem.delete(pathFileMoved, false);
1209     // OP_MKDIR 3
1210     Path pathDirectoryMkdir = new Path("/directory_mkdir");
1211     filesystem.mkdirs(pathDirectoryMkdir);
1212     // OP_ALLOW_SNAPSHOT 29
1213     filesystem.allowSnapshot(pathDirectoryMkdir);
1214     // OP_DISALLOW_SNAPSHOT 30
1215     filesystem.disallowSnapshot(pathDirectoryMkdir);
1216     // OP_CREATE_SNAPSHOT 26
1217     String ssName = "snapshot1";
1218     filesystem.allowSnapshot(pathDirectoryMkdir);
1219     filesystem.createSnapshot(pathDirectoryMkdir, ssName);
1220     // OP_RENAME_SNAPSHOT 28
1221     String ssNewName = "snapshot2";
1222     filesystem.renameSnapshot(pathDirectoryMkdir, ssName, ssNewName);
1223     // OP_DELETE_SNAPSHOT 27
1224     filesystem.deleteSnapshot(pathDirectoryMkdir, ssNewName);
1225     // OP_SET_REPLICATION 4
1226     s = filesystem.create(pathFileCreate);
1227     s.close();
1228     filesystem.setReplication(pathFileCreate, (short)1);
1229     // OP_SET_PERMISSIONS 7
1230     Short permission = 0777;
1231     filesystem.setPermission(pathFileCreate, new FsPermission(permission));
1232     // OP_SET_OWNER 8
1233     filesystem.setOwner(pathFileCreate, new String("newOwner"), null);
1234     // OP_CLOSE 9 see above
1235     // OP_SET_GENSTAMP 10 see above
1236     // OP_SET_NS_QUOTA 11 obsolete
1237     // OP_CLEAR_NS_QUOTA 12 obsolete
1238     // OP_TIMES 13
1239     long mtime = 1285195527000L; // Wed, 22 Sep 2010 22:45:27 GMT
1240     long atime = mtime;
1241     filesystem.setTimes(pathFileCreate, mtime, atime);
1242     // OP_SET_QUOTA 14
1243     filesystem.setQuota(pathDirectoryMkdir, 1000L,
1244         HdfsConstants.QUOTA_DONT_SET);
1245     // OP_SET_QUOTA_BY_STORAGETYPE
1246     filesystem.setQuotaByStorageType(pathDirectoryMkdir, StorageType.SSD, 888L);
1247     // OP_RENAME 15
1248     fc.rename(pathFileCreate, pathFileMoved, Rename.NONE);
1249     // OP_CONCAT_DELETE 16
1250     Path   pathConcatTarget = new Path("/file_concat_target");
1251     Path[] pathConcatFiles  = new Path[2];
1252     pathConcatFiles[0]      = new Path("/file_concat_0");
1253     pathConcatFiles[1]      = new Path("/file_concat_1");
1254 
1255     long length = blockSize * 3; // multiple of blocksize for concat
1256     short replication = 1;
1257     long seed = 1;
1258     DFSTestUtil.createFile(filesystem, pathConcatTarget, length, replication,
1259         seed);
1260     DFSTestUtil.createFile(filesystem, pathConcatFiles[0], length, replication,
1261         seed);
1262     DFSTestUtil.createFile(filesystem, pathConcatFiles[1], length, replication,
1263         seed);
1264     filesystem.concat(pathConcatTarget, pathConcatFiles);
1265 
1266     // OP_TRUNCATE 46
1267     length = blockSize * 2;
1268     DFSTestUtil.createFile(filesystem, pathFileCreate, length, replication,
1269         seed);
1270     filesystem.truncate(pathFileCreate, blockSize);
1271 
1272     // OP_SYMLINK 17
1273     Path pathSymlink = new Path("/file_symlink");
1274     fc.createSymlink(pathConcatTarget, pathSymlink, false);
1275 
1276     // OP_REASSIGN_LEASE 22
1277     String filePath = "/hard-lease-recovery-test";
1278     byte[] bytes = "foo-bar-baz".getBytes();
1279     DFSClientAdapter.stopLeaseRenewer(filesystem);
1280     FSDataOutputStream leaseRecoveryPath = filesystem.create(new Path(filePath));
1281     leaseRecoveryPath.write(bytes);
1282     leaseRecoveryPath.hflush();
1283     // Set the hard lease timeout to 1 second.
1284     cluster.setLeasePeriod(60 * 1000, 1000, nnIndex);
1285     // wait for lease recovery to complete
1286     LocatedBlocks locatedBlocks;
1287     do {
1288       try {
1289         Thread.sleep(1000);
1290       } catch (InterruptedException e) {}
1291       locatedBlocks = DFSClientAdapter.callGetBlockLocations(
1292           cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
1293     } while (locatedBlocks.isUnderConstruction());
1294     // OP_ADD_CACHE_POOL
1295     filesystem.addCachePool(new CachePoolInfo("pool1"));
1296     // OP_MODIFY_CACHE_POOL
1297     filesystem.modifyCachePool(new CachePoolInfo("pool1").setLimit(99l));
1298     // OP_ADD_PATH_BASED_CACHE_DIRECTIVE
1299     long id = filesystem.addCacheDirective(
1300         new CacheDirectiveInfo.Builder().
1301             setPath(new Path("/path")).
1302             setReplication((short)1).
1303             setPool("pool1").
1304             build(), EnumSet.of(CacheFlag.FORCE));
1305     // OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
1306     filesystem.modifyCacheDirective(
1307         new CacheDirectiveInfo.Builder().
1308             setId(id).
1309             setReplication((short)2).
1310             build(), EnumSet.of(CacheFlag.FORCE));
1311     // OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
1312     filesystem.removeCacheDirective(id);
1313     // OP_REMOVE_CACHE_POOL
1314     filesystem.removeCachePool("pool1");
1315     // OP_SET_ACL
1316     List<AclEntry> aclEntryList = Lists.newArrayList();
1317     aclEntryList.add(
1318         new AclEntry.Builder()
1319             .setPermission(FsAction.READ_WRITE)
1320             .setScope(AclEntryScope.ACCESS)
1321             .setType(AclEntryType.USER)
1322             .build());
1323     aclEntryList.add(
1324         new AclEntry.Builder()
1325             .setName("user")
1326             .setPermission(FsAction.READ_WRITE)
1327             .setScope(AclEntryScope.ACCESS)
1328             .setType(AclEntryType.USER)
1329             .build());
1330     aclEntryList.add(
1331         new AclEntry.Builder()
1332             .setPermission(FsAction.WRITE)
1333             .setScope(AclEntryScope.ACCESS)
1334             .setType(AclEntryType.GROUP)
1335             .build());
1336     aclEntryList.add(
1337         new AclEntry.Builder()
1338             .setPermission(FsAction.NONE)
1339             .setScope(AclEntryScope.ACCESS)
1340             .setType(AclEntryType.OTHER)
1341             .build());
1342     filesystem.setAcl(pathConcatTarget, aclEntryList);
1343     // OP_SET_XATTR
1344     filesystem.setXAttr(pathConcatTarget, "user.a1",
1345         new byte[]{0x31, 0x32, 0x33});
1346     filesystem.setXAttr(pathConcatTarget, "user.a2",
1347         new byte[]{0x37, 0x38, 0x39});
1348     // OP_REMOVE_XATTR
1349     filesystem.removeXAttr(pathConcatTarget, "user.a2");
1350   }
1351 
1352   public static void abortStream(DFSOutputStream out) throws IOException {
1353     out.abort();
1354   }
1355 
1356   public static byte[] asArray(ByteBuffer buf) {
1357     byte arr[] = new byte[buf.remaining()];
1358     buf.duplicate().get(arr);
1359     return arr;
1360   }
1361 
1362   /**
1363    * Blocks until cache usage hits the expected new value.
1364    */
1365   public static long verifyExpectedCacheUsage(final long expectedCacheUsed,
1366       final long expectedBlocks, final FsDatasetSpi<?> fsd) throws Exception {
1367     GenericTestUtils.waitFor(new Supplier<Boolean>() {
1368       private int tries = 0;
1369 
1370       @Override
1371       public Boolean get() {
1372         long curCacheUsed = fsd.getCacheUsed();
1373         long curBlocks = fsd.getNumBlocksCached();
1374         if ((curCacheUsed != expectedCacheUsed) ||
1375             (curBlocks != expectedBlocks)) {
1376           if (tries++ > 10) {
1377             LOG.info("verifyExpectedCacheUsage: have " +
1378                 curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
1379                 curBlocks + "/" + expectedBlocks + " blocks cached. " +
1380                 "memlock limit = " +
1381                 NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
1382                 ".  Waiting...");
1383           }
1384           return false;
1385         }
1386         LOG.info("verifyExpectedCacheUsage: got " +
1387             curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
1388             curBlocks + "/" + expectedBlocks + " blocks cached. " +
1389             "memlock limit = " +
1390             NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
1391         return true;
1392       }
1393     }, 100, 60000);
1394     return expectedCacheUsed;
1395   }
1396 
1397   /**
1398    * Round a long value up to a multiple of a factor.
1399    *
1400    * @param val    The value.
1401    * @param factor The factor to round up to.  Must be > 1.
1402    * @return       The rounded value.
1403    */
1404   public static long roundUpToMultiple(long val, int factor) {
1405     assert (factor > 1);
1406     long c = (val + factor - 1) / factor;
1407     return c * factor;
1408   }
1409 
1410   public static void checkComponentsEquals(byte[][] expected, byte[][] actual) {
1411     assertEquals("expected: " + DFSUtil.byteArray2PathString(expected)
1412         + ", actual: " + DFSUtil.byteArray2PathString(actual), expected.length,
1413         actual.length);
1414     int i = 0;
1415     for (byte[] e : expected) {
1416       byte[] actualComponent = actual[i++];
1417       assertTrue("expected: " + DFSUtil.bytes2String(e) + ", actual: "
1418           + DFSUtil.bytes2String(actualComponent),
1419           Arrays.equals(e, actualComponent));
1420     }
1421   }
1422 
1423   /**
1424    * A short-circuit test context which makes it easier to get a short-circuit
1425    * configuration and set everything up.
1426    */
1427   public static class ShortCircuitTestContext implements Closeable {
1428     private final String testName;
1429     private final TemporarySocketDirectory sockDir;
1430     private boolean closed = false;
1431     private final boolean formerTcpReadsDisabled;
1432 
1433     public ShortCircuitTestContext(String testName) {
1434       this.testName = testName;
1435       this.sockDir = new TemporarySocketDirectory();
1436       DomainSocket.disableBindPathValidation();
1437       formerTcpReadsDisabled = DFSInputStream.tcpReadsDisabledForTesting;
1438       Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
1439     }
1440 
1441     public Configuration newConfiguration() {
1442       Configuration conf = new Configuration();
1443       conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
1444       conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
1445           new File(sockDir.getDir(),
1446             testName + "._PORT.sock").getAbsolutePath());
1447       return conf;
1448     }
1449 
1450     public String getTestName() {
1451       return testName;
1452     }
1453 
1454     public void close() throws IOException {
1455       if (closed) return;
1456       closed = true;
1457       DFSInputStream.tcpReadsDisabledForTesting = formerTcpReadsDisabled;
1458       sockDir.close();
1459     }
1460   }
1461 
1462   /**
1463    * Verify that two files have the same contents.
1464    *
1465    * @param fs The file system containing the two files.
1466    * @param p1 The path of the first file.
1467    * @param p2 The path of the second file.
1468    * @param len The length of the two files.
1469    * @throws IOException
1470    */
1471   public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len)
1472       throws IOException {
1473     final FSDataInputStream in1 = fs.open(p1);
1474     final FSDataInputStream in2 = fs.open(p2);
1475     for (int i = 0; i < len; i++) {
1476       assertEquals("Mismatch at byte " + i, in1.read(), in2.read());
1477     }
1478     in1.close();
1479     in2.close();
1480   }
1481 
1482   /**
1483    * Verify that two files have different contents.
1484    *
1485    * @param fs The file system containing the two files.
1486    * @param p1 The path of the first file.
1487    * @param p2 The path of the second file.
1488    * @param len The length of the two files.
1489    * @throws IOException
1490    */
1491   public static void verifyFilesNotEqual(FileSystem fs, Path p1, Path p2,
1492       int len)
1493           throws IOException {
1494     final FSDataInputStream in1 = fs.open(p1);
1495     final FSDataInputStream in2 = fs.open(p2);
1496     try {
1497       for (int i = 0; i < len; i++) {
1498         if (in1.read() != in2.read()) {
1499           return;
1500         }
1501       }
1502       fail("files are equal, but should not be");
1503     } finally {
1504       in1.close();
1505       in2.close();
1506     }
1507   }
1508 
1509   /**
1510    * Helper function that verified blocks of a file are placed on the
1511    * expected storage type.
1512    *
1513    * @param fs The file system containing the the file.
1514    * @param client The DFS client used to access the file
1515    * @param path name to the file to verify
1516    * @param storageType expected storage type
1517    * @returns true if file exists and its blocks are located on the expected
1518    *            storage type.
1519    *          false otherwise.
1520    */
1521   public static boolean verifyFileReplicasOnStorageType(FileSystem fs,
1522     DFSClient client, Path path, StorageType storageType) throws IOException {
1523     if (!fs.exists(path)) {
1524       LOG.info("verifyFileReplicasOnStorageType: file " + path + "does not exist");
1525       return false;
1526     }
1527     long fileLength = client.getFileInfo(path.toString()).getLen();
1528     LocatedBlocks locatedBlocks =
1529       client.getLocatedBlocks(path.toString(), 0, fileLength);
1530     for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
1531       if (locatedBlock.getStorageTypes()[0] != storageType) {
1532         LOG.info("verifyFileReplicasOnStorageType: for file " + path +
1533             ". Expect blk" + locatedBlock +
1534           " on Type: " + storageType + ". Actual Type: " +
1535           locatedBlock.getStorageTypes()[0]);
1536         return false;
1537       }
1538     }
1539     return true;
1540   }
1541 
1542   /**
1543    * Helper function to create a key in the Key Provider. Defaults
1544    * to the first indexed NameNode's Key Provider.
1545    *
1546    * @param keyName The name of the key to create
1547    * @param cluster The cluster to create it in
1548    * @param conf Configuration to use
1549    */
1550   public static void createKey(String keyName, MiniDFSCluster cluster,
1551                                 Configuration conf)
1552           throws NoSuchAlgorithmException, IOException {
1553     createKey(keyName, cluster, 0, conf);
1554   }
1555 
1556   /**
1557    * Helper function to create a key in the Key Provider.
1558    *
1559    * @param keyName The name of the key to create
1560    * @param cluster The cluster to create it in
1561    * @param idx The NameNode index
1562    * @param conf Configuration to use
1563    */
1564   public static void createKey(String keyName, MiniDFSCluster cluster,
1565                                int idx, Configuration conf)
1566       throws NoSuchAlgorithmException, IOException {
1567     NameNode nn = cluster.getNameNode(idx);
1568     KeyProvider provider = nn.getNamesystem().getProvider();
1569     final KeyProvider.Options options = KeyProvider.options(conf);
1570     options.setDescription(keyName);
1571     options.setBitLength(128);
1572     provider.createKey(keyName, options);
1573     provider.flush();
1574   }
1575 
1576   /**
1577    * @return the node which is expected to run the recovery of the
1578    * given block, which is known to be under construction inside the
1579    * given NameNOde.
1580    */
1581   public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
1582       ExtendedBlock blk) {
1583     BlockManager bm0 = nn.getNamesystem().getBlockManager();
1584     BlockInfoContiguous storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
1585     assertTrue("Block " + blk + " should be under construction, " +
1586         "got: " + storedBlock,
1587         storedBlock instanceof BlockInfoContiguousUnderConstruction);
1588     BlockInfoContiguousUnderConstruction ucBlock =
1589       (BlockInfoContiguousUnderConstruction)storedBlock;
1590     // We expect that the replica with the most recent heart beat will be
1591     // the one to be in charge of the synchronization / recovery protocol.
1592     final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
1593     DatanodeStorageInfo expectedPrimary = storages[0];
1594     long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor()
1595         .getLastUpdateMonotonic();
1596     for (int i = 1; i < storages.length; i++) {
1597       final long lastUpdate = storages[i].getDatanodeDescriptor()
1598           .getLastUpdateMonotonic();
1599       if (lastUpdate > mostRecentLastUpdate) {
1600         expectedPrimary = storages[i];
1601         mostRecentLastUpdate = lastUpdate;
1602       }
1603     }
1604     return expectedPrimary.getDatanodeDescriptor();
1605   }
1606 
1607   public static void toolRun(Tool tool, String cmd, int retcode, String contain)
1608       throws Exception {
1609     String [] cmds = StringUtils.split(cmd, ' ');
1610     System.out.flush();
1611     System.err.flush();
1612     PrintStream origOut = System.out;
1613     PrintStream origErr = System.err;
1614     String output = null;
1615     int ret = 0;
1616     try {
1617       ByteArrayOutputStream bs = new ByteArrayOutputStream(1024);
1618       PrintStream out = new PrintStream(bs);
1619       System.setOut(out);
1620       System.setErr(out);
1621       ret = tool.run(cmds);
1622       System.out.flush();
1623       System.err.flush();
1624       out.close();
1625       output = bs.toString();
1626     } finally {
1627       System.setOut(origOut);
1628       System.setErr(origErr);
1629     }
1630     System.out.println("Output for command: " + cmd + " retcode: " + ret);
1631     if (output != null) {
1632       System.out.println(output);
1633     }
1634     assertEquals(retcode, ret);
1635     if (contain != null) {
1636       assertTrue("The real output is: " + output + ".\n It should contain: "
1637           + contain, output.contains(contain));
1638     }
1639   }
1640 
1641   public static void FsShellRun(String cmd, int retcode, String contain,
1642       Configuration conf) throws Exception {
1643     FsShell shell = new FsShell(new Configuration(conf));
1644     toolRun(shell, cmd, retcode, contain);
1645   }
1646 
1647   public static void DFSAdminRun(String cmd, int retcode, String contain,
1648       Configuration conf) throws Exception {
1649     DFSAdmin admin = new DFSAdmin(new Configuration(conf));
1650     toolRun(admin, cmd, retcode, contain);
1651   }
1652 
1653   public static void FsShellRun(String cmd, Configuration conf)
1654       throws Exception {
1655     FsShellRun(cmd, 0, null, conf);
1656   }
1657 
1658   public static void addDataNodeLayoutVersion(final int lv, final String description)
1659       throws NoSuchFieldException, IllegalAccessException {
1660     Preconditions.checkState(lv < DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
1661 
1662     // Override {@link DataNodeLayoutVersion#CURRENT_LAYOUT_VERSION} via reflection.
1663     Field modifiersField = Field.class.getDeclaredField("modifiers");
1664     modifiersField.setAccessible(true);
1665     Field field = DataNodeLayoutVersion.class.getField("CURRENT_LAYOUT_VERSION");
1666     field.setAccessible(true);
1667     modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
1668     field.setInt(null, lv);
1669 
1670     // Override {@link HdfsConstants#DATANODE_LAYOUT_VERSION}
1671     field = HdfsConstants.class.getField("DATANODE_LAYOUT_VERSION");
1672     field.setAccessible(true);
1673     modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
1674     field.setInt(null, lv);
1675 
1676     // Inject the feature into the FEATURES map.
1677     final LayoutVersion.FeatureInfo featureInfo =
1678         new LayoutVersion.FeatureInfo(lv, lv + 1, description, false);
1679     final LayoutVersion.LayoutFeature feature =
1680         new LayoutVersion.LayoutFeature() {
1681       @Override
1682       public LayoutVersion.FeatureInfo getInfo() {
1683         return featureInfo;
1684       }
1685     };
1686 
1687     // Update the FEATURES map with the new layout version.
1688     LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
1689                             new LayoutVersion.LayoutFeature[] { feature });
1690   }
1691 
1692   /**
1693    * Wait for datanode to reach alive or dead state for waitTime given in
1694    * milliseconds.
1695    */
1696   public static void waitForDatanodeState(
1697       final MiniDFSCluster cluster, final String nodeID,
1698       final boolean alive, int waitTime)
1699       throws TimeoutException, InterruptedException {
1700     GenericTestUtils.waitFor(new Supplier<Boolean>() {
1701       @Override
1702       public Boolean get() {
1703         FSNamesystem namesystem = cluster.getNamesystem();
1704         final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
1705             namesystem, nodeID);
1706         return (dd.isAlive == alive);
1707       }
1708     }, 100, waitTime);
1709   }
1710 
1711   public static void setNameNodeLogLevel(Level level) {
1712     GenericTestUtils.setLogLevel(FSNamesystem.LOG, level);
1713     GenericTestUtils.setLogLevel(BlockManager.LOG, level);
1714     GenericTestUtils.setLogLevel(LeaseManager.LOG, level);
1715     GenericTestUtils.setLogLevel(NameNode.LOG, level);
1716     GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
1717     GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
1718   }
1719 
1720  /**
1721    * Change the length of a block at datanode dnIndex
1722    */
1723   public static boolean changeReplicaLength(MiniDFSCluster cluster,
1724       ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
1725     File blockFile = cluster.getBlockFile(dnIndex, blk);
1726     if (blockFile != null && blockFile.exists()) {
1727       RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
1728       raFile.setLength(raFile.length()+lenDelta);
1729       raFile.close();
1730       return true;
1731     }
1732     LOG.info("failed to change length of block " + blk);
1733     return false;
1734   }
1735 
1736   /**
1737    * Set the datanode dead
1738    */
1739   public static void setDatanodeDead(DatanodeInfo dn) {
1740     dn.setLastUpdate(0);
1741     dn.setLastUpdateMonotonic(0);
1742   }
1743 
1744   /**
1745    * Update lastUpdate and lastUpdateMonotonic with some offset.
1746    */
1747   public static void resetLastUpdatesWithOffset(DatanodeInfo dn, long offset) {
1748     dn.setLastUpdate(Time.now() + offset);
1749     dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
1750   }
1751 
1752   public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
1753       Block block, BlockStatus blockStatus, DatanodeStorage storage) {
1754     ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
1755     receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
1756     StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
1757     reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
1758     return reports;
1759   }
1760 
1761   /**
1762    * Adds a block to a file.
1763    * This method only manipulates NameNode
1764    * states of the file and the block without injecting data to DataNode.
1765    * It does mimic block reports.
1766    * You should disable periodical heartbeat before use this.
1767    * @param dataNodes List DataNodes to host the block
1768    * @param previous Previous block in the file
1769    * @param len block size
1770    * @return The added block
1771    */
1772   public static Block addBlockToFile(
1773       List<DataNode> dataNodes, DistributedFileSystem fs, FSNamesystem ns,
1774       String file, INodeFile fileNode,
1775       String clientName, ExtendedBlock previous, int len)
1776       throws Exception {
1777     fs.getClient().namenode.addBlock(file, clientName, previous, null,
1778         fileNode.getId(), null);
1779 
1780     final BlockInfoContiguous lastBlock =
1781         fileNode.getLastBlock();
1782     final int groupSize = fileNode.getBlockReplication();
1783     assert dataNodes.size() >= groupSize;
1784     // 1. RECEIVING_BLOCK IBR
1785     for (int i = 0; i < groupSize; i++) {
1786       DataNode dn = dataNodes.get(i);
1787       final Block block = new Block(lastBlock.getBlockId() + i, 0,
1788           lastBlock.getGenerationStamp());
1789       DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
1790       StorageReceivedDeletedBlocks[] reports = DFSTestUtil
1791           .makeReportForReceivedBlock(block,
1792               ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
1793       for (StorageReceivedDeletedBlocks report : reports) {
1794         ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
1795       }
1796     }
1797 
1798     // 2. RECEIVED_BLOCK IBR
1799     for (int i = 0; i < groupSize; i++) {
1800       DataNode dn = dataNodes.get(i);
1801       final Block block = new Block(lastBlock.getBlockId() + i,
1802           len, lastBlock.getGenerationStamp());
1803       DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
1804       StorageReceivedDeletedBlocks[] reports = DFSTestUtil
1805           .makeReportForReceivedBlock(block,
1806               ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
1807       for (StorageReceivedDeletedBlocks report : reports) {
1808         ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
1809       }
1810     }
1811     lastBlock.setNumBytes(len);
1812     return lastBlock;
1813   }
1814 }
1815