1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import java.io.File;
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.net.InetSocketAddress;
24 import java.net.URI;
25 import java.net.URISyntaxException;
26 import java.nio.channels.FileChannel;
27 import java.security.PrivilegedExceptionAction;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Random;
32 
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.hdfs.protocol.Block;
37 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
38 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
39 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
40 import org.apache.hadoop.hdfs.server.datanode.DataNode;
41 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
42 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
43 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
44 import org.apache.hadoop.hdfs.server.namenode.NameNode;
45 import org.apache.hadoop.hdfs.tools.DFSAdmin;
46 import org.apache.hadoop.net.DNSToSwitchMapping;
47 import org.apache.hadoop.net.NetUtils;
48 import org.apache.hadoop.net.StaticMapping;
49 import org.apache.hadoop.security.UserGroupInformation;
50 import org.apache.hadoop.util.StringUtils;
51 import org.apache.hadoop.util.ToolRunner;
52 
53 /**
54  * This class creates a single-process DFS cluster for junit testing.
55  * The data directories for non-simulated DFS are under the testing directory.
56  * For simulated data nodes, no underlying fs storage is used.
57  */
58 public class MiniDFSCluster {
59 
60   public class DataNodeProperties {
61     DataNode datanode;
62     Configuration conf;
63     String[] dnArgs;
64 
DataNodeProperties(DataNode node, Configuration conf, String[] args)65     DataNodeProperties(DataNode node, Configuration conf, String[] args) {
66       this.datanode = node;
67       this.conf = conf;
68       this.dnArgs = args;
69     }
70   }
71 
72   private Configuration conf;
73   protected NameNode nameNode;
74   protected int numDataNodes;
75   protected List<DataNodeProperties> dataNodes =
76                          new ArrayList<DataNodeProperties>();
77   private File base_dir;
78   protected File data_dir;
79 
80   /**
81    * This null constructor is used only when wishing to start a data node cluster
82    * without a name node (ie when the name node is started elsewhere).
83    */
MiniDFSCluster()84   public MiniDFSCluster() {
85   }
86 
87   /**
88    * Modify the config and start up the servers with the given operation.
89    * Servers will be started on free ports.
90    * <p>
91    * The caller must manage the creation of NameNode and DataNode directories
92    * and have already set dfs.name.dir and dfs.data.dir in the given conf.
93    *
94    * @param conf the base configuration to use in starting the servers.  This
95    *          will be modified as necessary.
96    * @param numDataNodes Number of DataNodes to start; may be zero
97    * @param nameNodeOperation the operation with which to start the servers.  If null
98    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
99    */
MiniDFSCluster(Configuration conf, int numDataNodes, StartupOption nameNodeOperation)100   public MiniDFSCluster(Configuration conf,
101                         int numDataNodes,
102                         StartupOption nameNodeOperation) throws IOException {
103     this(0, conf, numDataNodes, false, false, false,  nameNodeOperation,
104           null, null, null);
105   }
106 
107   /**
108    * Modify the config and start up the servers.  The rpc and info ports for
109    * servers are guaranteed to use free ports.
110    * <p>
111    * NameNode and DataNode directory creation and configuration will be
112    * managed by this class.
113    *
114    * @param conf the base configuration to use in starting the servers.  This
115    *          will be modified as necessary.
116    * @param numDataNodes Number of DataNodes to start; may be zero
117    * @param format if true, format the NameNode and DataNodes before starting up
118    * @param racks array of strings indicating the rack that each DataNode is on
119    */
MiniDFSCluster(Configuration conf, int numDataNodes, boolean format, String[] racks)120   public MiniDFSCluster(Configuration conf,
121                         int numDataNodes,
122                         boolean format,
123                         String[] racks) throws IOException {
124     this(0, conf, numDataNodes, format, true, true,  null, racks, null, null);
125   }
126 
127   /**
128    * Modify the config and start up the servers.  The rpc and info ports for
129    * servers are guaranteed to use free ports.
130    * <p>
131    * NameNode and DataNode directory creation and configuration will be
132    * managed by this class.
133    *
134    * @param conf the base configuration to use in starting the servers.  This
135    *          will be modified as necessary.
136    * @param numDataNodes Number of DataNodes to start; may be zero
137    * @param format if true, format the NameNode and DataNodes before starting up
138    * @param racks array of strings indicating the rack that each DataNode is on
139    * @param hosts array of strings indicating the hostname for each DataNode
140    */
MiniDFSCluster(Configuration conf, int numDataNodes, boolean format, String[] racks, String[] hosts)141   public MiniDFSCluster(Configuration conf,
142                         int numDataNodes,
143                         boolean format,
144                         String[] racks, String[] hosts) throws IOException {
145     this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null);
146   }
147 
148   /**
149    * NOTE: if possible, the other constructors that don't have nameNode port
150    * parameter should be used as they will ensure that the servers use free ports.
151    * <p>
152    * Modify the config and start up the servers.
153    *
154    * @param nameNodePort suggestion for which rpc port to use.  caller should
155    *          use getNameNodePort() to get the actual port used.
156    * @param conf the base configuration to use in starting the servers.  This
157    *          will be modified as necessary.
158    * @param numDataNodes Number of DataNodes to start; may be zero
159    * @param format if true, format the NameNode and DataNodes before starting up
160    * @param manageDfsDirs if true, the data directories for servers will be
161    *          created and dfs.name.dir and dfs.data.dir will be set in the conf
162    * @param operation the operation with which to start the servers.  If null
163    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
164    * @param racks array of strings indicating the rack that each DataNode is on
165    */
MiniDFSCluster(int nameNodePort, Configuration conf, int numDataNodes, boolean format, boolean manageDfsDirs, StartupOption operation, String[] racks)166   public MiniDFSCluster(int nameNodePort,
167                         Configuration conf,
168                         int numDataNodes,
169                         boolean format,
170                         boolean manageDfsDirs,
171                         StartupOption operation,
172                         String[] racks) throws IOException {
173     this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
174          operation, racks, null, null);
175   }
176 
177   /**
178    * NOTE: if possible, the other constructors that don't have nameNode port
179    * parameter should be used as they will ensure that the servers use free ports.
180    * <p>
181    * Modify the config and start up the servers.
182    *
183    * @param nameNodePort suggestion for which rpc port to use.  caller should
184    *          use getNameNodePort() to get the actual port used.
185    * @param conf the base configuration to use in starting the servers.  This
186    *          will be modified as necessary.
187    * @param numDataNodes Number of DataNodes to start; may be zero
188    * @param format if true, format the NameNode and DataNodes before starting up
189    * @param manageDfsDirs if true, the data directories for servers will be
190    *          created and dfs.name.dir and dfs.data.dir will be set in the conf
191    * @param operation the operation with which to start the servers.  If null
192    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
193    * @param racks array of strings indicating the rack that each DataNode is on
194    * @param simulatedCapacities array of capacities of the simulated data nodes
195    */
MiniDFSCluster(int nameNodePort, Configuration conf, int numDataNodes, boolean format, boolean manageDfsDirs, StartupOption operation, String[] racks, long[] simulatedCapacities)196   public MiniDFSCluster(int nameNodePort,
197                         Configuration conf,
198                         int numDataNodes,
199                         boolean format,
200                         boolean manageDfsDirs,
201                         StartupOption operation,
202                         String[] racks,
203                         long[] simulatedCapacities) throws IOException {
204     this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
205           operation, racks, null, simulatedCapacities);
206   }
207 
208   /**
209    * NOTE: if possible, the other constructors that don't have nameNode port
210    * parameter should be used as they will ensure that the servers use free ports.
211    * <p>
212    * Modify the config and start up the servers.
213    *
214    * @param nameNodePort suggestion for which rpc port to use.  caller should
215    *          use getNameNodePort() to get the actual port used.
216    * @param conf the base configuration to use in starting the servers.  This
217    *          will be modified as necessary.
218    * @param numDataNodes Number of DataNodes to start; may be zero
219    * @param format if true, format the NameNode and DataNodes before starting up
220    * @param manageNameDfsDirs if true, the data directories for servers will be
221    *          created and dfs.name.dir and dfs.data.dir will be set in the conf
222    * @param manageDataDfsDirs if true, the data directories for datanodes will
223    *          be created and dfs.data.dir set to same in the conf
224    * @param operation the operation with which to start the servers.  If null
225    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
226    * @param racks array of strings indicating the rack that each DataNode is on
227    * @param hosts array of strings indicating the hostnames of each DataNode
228    * @param simulatedCapacities array of capacities of the simulated data nodes
229    */
MiniDFSCluster(int nameNodePort, Configuration conf, int numDataNodes, boolean format, boolean manageNameDfsDirs, boolean manageDataDfsDirs, StartupOption operation, String[] racks, String hosts[], long[] simulatedCapacities)230   public MiniDFSCluster(int nameNodePort,
231                         Configuration conf,
232                         int numDataNodes,
233                         boolean format,
234                         boolean manageNameDfsDirs,
235                         boolean manageDataDfsDirs,
236                         StartupOption operation,
237                         String[] racks, String hosts[],
238                         long[] simulatedCapacities) throws IOException {
239     this.conf = conf;
240     base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");
241     data_dir = new File(base_dir, "data");
242 
243     // Setup the NameNode configuration
244     FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
245     conf.set("dfs.http.address", "127.0.0.1:0");
246     if (manageNameDfsDirs) {
247       conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
248                new File(base_dir, "name2").getPath());
249       conf.set("fs.checkpoint.dir", new File(base_dir, "namesecondary1").
250                 getPath()+"," + new File(base_dir, "namesecondary2").getPath());
251     }
252 
253     int replication = conf.getInt("dfs.replication", 3);
254     conf.setInt("dfs.replication", Math.min(replication, numDataNodes));
255     int safemodeExtension = conf.getInt("dfs.safemode.extension.testing", 0);
256     conf.setInt("dfs.safemode.extension", safemodeExtension);
257     conf.setInt("dfs.namenode.decommission.interval", 3); // 3 second
258 
259     // Set a small delay on blockReceived in the minicluster to approximate
260     // a real cluster a little better and suss out bugs.
261     conf.setInt("dfs.datanode.artificialBlockReceivedDelay", 5);
262 
263     // Format and clean out DataNode directories
264     if (format) {
265       if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
266         throw new IOException("Cannot remove data directory: " + data_dir);
267       }
268       NameNode.format(conf);
269     }
270 
271     // Start the NameNode
272     String[] args = (operation == null ||
273                      operation == StartupOption.FORMAT ||
274                      operation == StartupOption.REGULAR) ?
275       new String[] {} : new String[] {operation.getName()};
276     conf.setClass("topology.node.switch.mapping.impl",
277                    StaticMapping.class, DNSToSwitchMapping.class);
278     nameNode = NameNode.createNameNode(args, conf);
279 
280     if (operation == StartupOption.RECOVER) {
281       return;
282     }
283     // Start the DataNodes
284     startDataNodes(conf, numDataNodes, manageDataDfsDirs,
285                     operation, racks, hosts, simulatedCapacities);
286     waitClusterUp();
287   }
288 
289   /**
290    * wait for the cluster to get out of
291    * safemode.
292    */
waitClusterUp()293   public void waitClusterUp() {
294     if (numDataNodes > 0) {
295       while (!isClusterUp()) {
296         try {
297           System.err.println("Waiting for the Mini HDFS Cluster to start...");
298           Thread.sleep(1000);
299         } catch (InterruptedException e) {
300         }
301       }
302     }
303   }
304 
305   /**
306    * Modify the config and start up additional DataNodes.  The info port for
307    * DataNodes is guaranteed to use a free port.
308    *
309    *  Data nodes can run with the name node in the mini cluster or
310    *  a real name node. For example, running with a real name node is useful
311    *  when running simulated data nodes with a real name node.
312    *  If minicluster's name node is null assume that the conf has been
313    *  set with the right address:port of the name node.
314    *
315    * @param conf the base configuration to use in starting the DataNodes.  This
316    *          will be modified as necessary.
317    * @param numDataNodes Number of DataNodes to start; may be zero
318    * @param manageDfsDirs if true, the data directories for DataNodes will be
319    *          created and dfs.data.dir will be set in the conf
320    * @param operation the operation with which to start the DataNodes.  If null
321    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
322    * @param racks array of strings indicating the rack that each DataNode is on
323    * @param hosts array of strings indicating the hostnames for each DataNode
324    * @param simulatedCapacities array of capacities of the simulated data nodes
325    *
326    * @throws IllegalStateException if NameNode has been shutdown
327    */
startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks, String[] hosts, long[] simulatedCapacities)328   public synchronized void startDataNodes(Configuration conf, int numDataNodes,
329                              boolean manageDfsDirs, StartupOption operation,
330                              String[] racks, String[] hosts,
331                              long[] simulatedCapacities) throws IOException {
332     conf.set("slave.host.name", "127.0.0.1");
333 
334     int curDatanodesNum = dataNodes.size();
335     // for mincluster's the default initialDelay for BRs is 0
336     if (conf.get("dfs.blockreport.initialDelay") == null) {
337       conf.setLong("dfs.blockreport.initialDelay", 0);
338     }
339     // If minicluster's name node is null assume that the conf has been
340     // set with the right address:port of the name node.
341     //
342     if (nameNode != null) { // set conf from the name node
343       InetSocketAddress nnAddr = nameNode.getNameNodeAddress();
344       int nameNodePort = nnAddr.getPort();
345       FileSystem.setDefaultUri(conf,
346                                "hdfs://"+ nnAddr.getHostName() +
347                                ":" + Integer.toString(nameNodePort));
348     }
349 
350     if (racks != null && numDataNodes > racks.length ) {
351       throw new IllegalArgumentException( "The length of racks [" + racks.length
352           + "] is less than the number of datanodes [" + numDataNodes + "].");
353     }
354     if (hosts != null && numDataNodes > hosts.length ) {
355       throw new IllegalArgumentException( "The length of hosts [" + hosts.length
356           + "] is less than the number of datanodes [" + numDataNodes + "].");
357     }
358     //Generate some hostnames if required
359     if (racks != null && hosts == null) {
360       System.out.println("Generating host names for datanodes");
361       hosts = new String[numDataNodes];
362       for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
363         hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
364       }
365     }
366 
367     if (simulatedCapacities != null
368         && numDataNodes > simulatedCapacities.length) {
369       throw new IllegalArgumentException( "The length of simulatedCapacities ["
370           + simulatedCapacities.length
371           + "] is less than the number of datanodes [" + numDataNodes + "].");
372     }
373 
374     // Set up the right ports for the datanodes
375     conf.set("dfs.datanode.address", "127.0.0.1:0");
376     conf.set("dfs.datanode.http.address", "127.0.0.1:0");
377     conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
378 
379 
380     String [] dnArgs = (operation == null ||
381                         operation != StartupOption.ROLLBACK) ?
382         null : new String[] {operation.getName()};
383 
384 
385     for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
386       Configuration dnConf = new Configuration(conf);
387       if (manageDfsDirs) {
388         File dir1 = new File(data_dir, "data"+(2*i+1));
389         File dir2 = new File(data_dir, "data"+(2*i+2));
390         dir1.mkdirs();
391         dir2.mkdirs();
392         if (!dir1.isDirectory() || !dir2.isDirectory()) {
393           throw new IOException("Mkdirs failed to create directory for DataNode "
394                                 + i + ": " + dir1 + " or " + dir2);
395         }
396         dnConf.set(DataNode.DATA_DIR_KEY, dir1.getPath() + "," + dir2.getPath());
397       }
398       if (simulatedCapacities != null) {
399         dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
400         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
401             simulatedCapacities[i-curDatanodesNum]);
402       }
403       System.out.println("Starting DataNode " + i + " with dfs.data.dir: "
404                          + dnConf.get("dfs.data.dir"));
405       if (hosts != null) {
406         dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
407         System.out.println("Starting DataNode " + i + " with hostname set to: "
408                            + dnConf.get("slave.host.name"));
409       }
410       if (racks != null) {
411         String name = hosts[i - curDatanodesNum];
412         System.out.println("Adding node with hostname : " + name + " to rack "+
413                             racks[i-curDatanodesNum]);
414         StaticMapping.addNodeToRack(name,
415                                     racks[i-curDatanodesNum]);
416       }
417       Configuration newconf = new Configuration(dnConf); // save config
418       if (hosts != null) {
419         NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
420       }
421       DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
422       //NOTE: the following is true if and only if:
423       //      hadoop.security.token.service.use_ip=true
424       //since the HDFS does things based on IP:port, we need to add the mapping
425       //for IP:port to rackId
426       String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
427       if (racks != null) {
428         int port = dn.getSelfAddr().getPort();
429         System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
430                             " to rack " + racks[i-curDatanodesNum]);
431         StaticMapping.addNodeToRack(ipAddr + ":" + port,
432                                   racks[i-curDatanodesNum]);
433       }
434       DataNode.runDatanodeDaemon(dn);
435       dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
436     }
437     curDatanodesNum += numDataNodes;
438     this.numDataNodes += numDataNodes;
439     waitActive();
440   }
441 
442   /**
443    * Modify the config and start up the DataNodes.  The info port for
444    * DataNodes is guaranteed to use a free port.
445    *
446    * @param conf the base configuration to use in starting the DataNodes.  This
447    *          will be modified as necessary.
448    * @param numDataNodes Number of DataNodes to start; may be zero
449    * @param manageDfsDirs if true, the data directories for DataNodes will be
450    *          created and dfs.data.dir will be set in the conf
451    * @param operation the operation with which to start the DataNodes.  If null
452    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
453    * @param racks array of strings indicating the rack that each DataNode is on
454    *
455    * @throws IllegalStateException if NameNode has been shutdown
456    */
457 
startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks )458   public void startDataNodes(Configuration conf, int numDataNodes,
459       boolean manageDfsDirs, StartupOption operation,
460       String[] racks
461       ) throws IOException {
462     startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null, null);
463   }
464 
465   /**
466    * Modify the config and start up additional DataNodes.  The info port for
467    * DataNodes is guaranteed to use a free port.
468    *
469    *  Data nodes can run with the name node in the mini cluster or
470    *  a real name node. For example, running with a real name node is useful
471    *  when running simulated data nodes with a real name node.
472    *  If minicluster's name node is null assume that the conf has been
473    *  set with the right address:port of the name node.
474    *
475    * @param conf the base configuration to use in starting the DataNodes.  This
476    *          will be modified as necessary.
477    * @param numDataNodes Number of DataNodes to start; may be zero
478    * @param manageDfsDirs if true, the data directories for DataNodes will be
479    *          created and dfs.data.dir will be set in the conf
480    * @param operation the operation with which to start the DataNodes.  If null
481    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
482    * @param racks array of strings indicating the rack that each DataNode is on
483    * @param simulatedCapacities array of capacities of the simulated data nodes
484    *
485    * @throws IllegalStateException if NameNode has been shutdown
486    */
startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks, long[] simulatedCapacities)487   public void startDataNodes(Configuration conf, int numDataNodes,
488                              boolean manageDfsDirs, StartupOption operation,
489                              String[] racks,
490                              long[] simulatedCapacities) throws IOException {
491     startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
492                    simulatedCapacities);
493 
494   }
495   /**
496    * If the NameNode is running, attempt to finalize a previous upgrade.
497    * When this method return, the NameNode should be finalized, but
498    * DataNodes may not be since that occurs asynchronously.
499    *
500    * @throws IllegalStateException if the Namenode is not running.
501    */
finalizeCluster(Configuration conf)502   public void finalizeCluster(Configuration conf) throws Exception {
503     if (nameNode == null) {
504       throw new IllegalStateException("Attempting to finalize "
505                                       + "Namenode but it is not running");
506     }
507     ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
508   }
509 
510   /**
511    * Gets the started NameNode.  May be null.
512    */
getNameNode()513   public NameNode getNameNode() {
514     return nameNode;
515   }
516 
517   /**
518    * Gets a list of the started DataNodes.  May be empty.
519    */
getDataNodes()520   public ArrayList<DataNode> getDataNodes() {
521     ArrayList<DataNode> list = new ArrayList<DataNode>();
522     for (int i = 0; i < dataNodes.size(); i++) {
523       DataNode node = dataNodes.get(i).datanode;
524       list.add(node);
525     }
526     return list;
527   }
528 
529   /** @return the datanode having the ipc server listen port */
getDataNode(int ipcPort)530   public DataNode getDataNode(int ipcPort) {
531     for(DataNode dn : getDataNodes()) {
532       if (dn.ipcServer.getListenerAddress().getPort() == ipcPort) {
533         return dn;
534       }
535     }
536     return null;
537   }
538 
539   /**
540    * Gets the rpc port used by the NameNode, because the caller
541    * supplied port is not necessarily the actual port used.
542    */
getNameNodePort()543   public int getNameNodePort() {
544     return nameNode.getNameNodeAddress().getPort();
545   }
546 
547   /**
548    * Shut down the servers that are up.
549    */
shutdown()550   public void shutdown() {
551     System.out.println("Shutting down the Mini HDFS Cluster");
552     shutdownDataNodes();
553     if (nameNode != null) {
554       nameNode.stop();
555       nameNode.join();
556       nameNode = null;
557     }
558   }
559 
560   /**
561    * Shutdown all DataNodes started by this class.  The NameNode
562    * is left running so that new DataNodes may be started.
563    */
shutdownDataNodes()564   public void shutdownDataNodes() {
565     for (int i = dataNodes.size()-1; i >= 0; i--) {
566       System.out.println("Shutting down DataNode " + i);
567       DataNode dn = dataNodes.remove(i).datanode;
568       dn.shutdown();
569       numDataNodes--;
570     }
571   }
572 
573   /**
574    * Shutdown namenode.
575    */
shutdownNameNode()576   public synchronized void shutdownNameNode() {
577     if (nameNode != null) {
578       System.out.println("Shutting down the namenode");
579       nameNode.stop();
580       nameNode.join();
581       nameNode = null;
582     }
583   }
584 
585   /** Same as restartNameNode(true, true). */
restartNameNode()586   public synchronized void restartNameNode() throws IOException {
587     restartNameNode(true, true);
588   }
589 
590   /** Same as restartNameNode(waitSafemodeExit, true). */
restartNameNode(boolean waitSafemodeExit )591   public synchronized void restartNameNode(boolean waitSafemodeExit
592       ) throws IOException {
593     restartNameNode(waitSafemodeExit, true);
594   }
595 
596   /**
597    * Restart namenode.
598    *
599    * @param waitSafemodeExit Should it wait for safe mode to turn off?
600    * @param waitClusterActive Should it wait for cluster to be active?
601    * @throws IOException
602    */
restartNameNode(boolean waitSafemodeExit, boolean waitClusterActive)603   public synchronized void restartNameNode(boolean waitSafemodeExit,
604       boolean waitClusterActive) throws IOException {
605     shutdownNameNode();
606     nameNode = NameNode.createNameNode(new String[] {}, conf);
607     if (waitSafemodeExit) {
608       waitClusterUp();
609     }
610     System.out.println("Restarted the namenode");
611 
612     int failedCount = 0;
613     while(waitClusterActive) {
614       try {
615         waitActive();
616         break;
617       } catch (IOException e) {
618         failedCount++;
619         // Cached RPC connection to namenode, if any, is expected to fail once
620         if (failedCount > 1) {
621           System.out.println("Tried waitActive() " + failedCount
622               + " time(s) and failed, giving up.  "
623               + StringUtils.stringifyException(e));
624           throw e;
625         }
626       }
627     }
628   }
629 
630   /*
631    * Corrupt a block on all datanode
632    */
corruptBlockOnDataNodes(String blockName)633   void corruptBlockOnDataNodes(String blockName) throws Exception{
634     for (int i=0; i < dataNodes.size(); i++)
635       corruptBlockOnDataNode(i,blockName);
636   }
637 
638   /*
639    * Corrupt a block on a particular datanode
640    */
corruptBlockOnDataNode(int i, String blockName)641   boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
642     Random random = new Random();
643     boolean corrupted = false;
644     File dataDir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/data");
645     if (i < 0 || i >= dataNodes.size())
646       return false;
647     for (int dn = i*2; dn < i*2+2; dn++) {
648       File blockFile = new File(dataDir, "data" + (dn+1) + "/current/" +
649                                 blockName);
650       System.out.println("Corrupting for: " + blockFile);
651       if (blockFile.exists()) {
652         // Corrupt replica by writing random bytes into replica
653         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
654         FileChannel channel = raFile.getChannel();
655         String badString = "BADBAD";
656         int rand = random.nextInt((int)channel.size()/2);
657         raFile.seek(rand);
658         raFile.write(badString.getBytes());
659         raFile.close();
660       }
661       corrupted = true;
662     }
663     return corrupted;
664   }
665 
666   /*
667    * Shutdown a particular datanode
668    */
stopDataNode(int i)669   public synchronized DataNodeProperties stopDataNode(int i) {
670     if (i < 0 || i >= dataNodes.size()) {
671       return null;
672     }
673     DataNodeProperties dnprop = dataNodes.remove(i);
674     DataNode dn = dnprop.datanode;
675     System.out.println("MiniDFSCluster Stopping DataNode " +
676                        dn.dnRegistration.getName() +
677                        " from a total of " + (dataNodes.size() + 1) +
678                        " datanodes.");
679     dn.shutdown();
680     numDataNodes--;
681     return dnprop;
682   }
683 
684   /*
685    * Shutdown a datanode by name.
686    */
stopDataNode(String name)687   public synchronized DataNodeProperties stopDataNode(String name) {
688     int i;
689     for (i = 0; i < dataNodes.size(); i++) {
690       DataNode dn = dataNodes.get(i).datanode;
691       if (dn.dnRegistration.getName().equals(name)) {
692         break;
693       }
694     }
695     return stopDataNode(i);
696   }
697 
698   /**
699    * Restart a datanode
700    * @param dnprop datanode's property
701    * @return true if restarting is successful
702    * @throws IOException
703    */
restartDataNode(DataNodeProperties dnprop)704   public boolean restartDataNode(DataNodeProperties dnprop) throws IOException {
705     return restartDataNode(dnprop, false);
706   }
707 
708   /**
709    * Restart a datanode, on the same port if requested
710    * @param dnprop, the datanode to restart
711    * @param keepPort, whether to use the same port
712    * @return true if restarting is successful
713    * @throws IOException
714    */
restartDataNode(DataNodeProperties dnprop, boolean keepPort)715   public synchronized boolean restartDataNode(DataNodeProperties dnprop,
716       boolean keepPort) throws IOException {
717     Configuration conf = dnprop.conf;
718     String[] args = dnprop.dnArgs;
719     Configuration newconf = new Configuration(conf); // save cloned config
720     if (keepPort) {
721       InetSocketAddress addr = dnprop.datanode.getSelfAddr();
722       conf.set("dfs.datanode.address", addr.getAddress().getHostAddress() + ":"
723           + addr.getPort());
724     }
725     dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf),
726         newconf, args));
727     numDataNodes++;
728     return true;
729   }
730 
731   /*
732    * Restart a particular datanode, use newly assigned port
733    */
restartDataNode(int i)734   public boolean restartDataNode(int i) throws IOException {
735     return restartDataNode(i, false);
736   }
737 
738   /*
739    * Restart a particular datanode, on the same port if keepPort is true
740    */
restartDataNode(int i, boolean keepPort)741   public synchronized boolean restartDataNode(int i, boolean keepPort)
742       throws IOException {
743     DataNodeProperties dnprop = stopDataNode(i);
744     if (dnprop == null) {
745       return false;
746     } else {
747       return restartDataNode(dnprop, keepPort);
748     }
749   }
750 
751   /*
752    * Restart all datanodes, on the same ports if keepPort is true
753    */
restartDataNodes(boolean keepPort)754   public synchronized boolean restartDataNodes(boolean keepPort)
755       throws IOException {
756     for (int i = dataNodes.size() - 1; i >= 0; i--) {
757       if (!restartDataNode(i, keepPort))
758         return false;
759       System.out.println("Restarted DataNode " + i);
760     }
761     return true;
762   }
763 
764   /*
765    * Restart all datanodes, use newly assigned ports
766    */
restartDataNodes()767   public boolean restartDataNodes() throws IOException {
768     return restartDataNodes(false);
769   }
770 
771   /**
772    * Returns true if the NameNode is running and is out of Safe Mode.
773    */
isClusterUp()774   public boolean isClusterUp() {
775     if (nameNode == null) {
776       return false;
777     }
778     try {
779       long[] sizes = nameNode.getStats();
780       boolean isUp = false;
781       synchronized (this) {
782         isUp = (!nameNode.isInSafeMode() && sizes[0] != 0);
783       }
784       return isUp;
785     } catch (IOException ie) {
786       return false;
787     }
788   }
789 
790   /**
791    * Returns true if there is at least one DataNode running.
792    */
isDataNodeUp()793   public boolean isDataNodeUp() {
794     if (dataNodes == null || dataNodes.size() == 0) {
795       return false;
796     }
797     return true;
798   }
799 
800   /**
801    * Get a client handle to the DFS cluster.
802    */
getFileSystem()803   public FileSystem getFileSystem() throws IOException {
804     return FileSystem.get(conf);
805   }
806 
807   /**
808    * @return a {@link HftpFileSystem} object.
809    */
getHftpFileSystem()810   public HftpFileSystem getHftpFileSystem() throws IOException {
811     final String str = "hftp://" + conf.get("dfs.http.address");
812     try {
813       return (HftpFileSystem)FileSystem.get(new URI(str), conf);
814     } catch (URISyntaxException e) {
815       throw new IOException(e);
816     }
817   }
818 
819   /**
820    *  @return a {@link HftpFileSystem} object as specified user.
821    */
getHftpFileSystemAs(final String username, final Configuration conf, final String... groups )822   public HftpFileSystem getHftpFileSystemAs(final String username,
823       final Configuration conf, final String... groups
824       ) throws IOException, InterruptedException {
825     final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
826         username, groups);
827     return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() {
828       @Override
829       public HftpFileSystem run() throws Exception {
830         return getHftpFileSystem();
831       }
832     });
833   }
834 
835   /**
836    * Get the directories where the namenode stores its image.
837    */
838   public Collection<File> getNameDirs() {
839     return FSNamesystem.getNamespaceDirs(conf);
840   }
841 
842   /**
843    * Get the directories where the namenode stores its edits.
844    */
845   public Collection<File> getNameEditsDirs() {
846     return FSNamesystem.getNamespaceEditsDirs(conf);
847   }
848 
849   /**
850    * Wait until the cluster is active and running.
851    */
852   public void waitActive() throws IOException {
853     if (nameNode == null) {
854       return;
855     }
856     InetSocketAddress addr = NetUtils.makeSocketAddr("localhost",
857                                                    getNameNodePort());
858     DFSClient client = new DFSClient(addr, conf);
859 
860     // make sure all datanodes have registered and sent heartbeat
861     while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
862       try {
863         Thread.sleep(100);
864       } catch (InterruptedException e) {
865       }
866     }
867 
868     client.close();
869     System.out.println("Cluster is active");
870   }
871 
872   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
873     if (dnInfo.length != numDataNodes) {
874       return true;
875     }
876     // make sure all datanodes have sent first heartbeat to namenode,
877     // using (capacity == 0) as proxy.
878     for (DatanodeInfo dn : dnInfo) {
879       if (dn.getCapacity() == 0) {
880         return true;
881       }
882     }
883     return false;
884   }
885 
886   /**
887    * Wait for the given datanode to heartbeat once.
888    */
889   public void waitForDNHeartbeat(int dnIndex, long timeoutMillis)
890     throws IOException, InterruptedException {
891     DataNode dn = getDataNodes().get(dnIndex);
892     InetSocketAddress addr = new InetSocketAddress("localhost",
893                                                    getNameNodePort());
894     DFSClient client = new DFSClient(addr, conf);
895 
896     long startTime = System.currentTimeMillis();
897     while (System.currentTimeMillis() < startTime + timeoutMillis) {
898       DatanodeInfo report[] = client.datanodeReport(DatanodeReportType.LIVE);
899 
900       for (DatanodeInfo thisReport : report) {
901         if (thisReport.getStorageID().equals(
902               dn.dnRegistration.getStorageID())) {
903           if (thisReport.getLastUpdate() > startTime)
904             return;
905         }
906       }
907 
908       Thread.sleep(500);
909     }
910   }
911 
912   public void formatDataNodeDirs() throws IOException {
913     base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");
914     data_dir = new File(base_dir, "data");
915     if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
916       throw new IOException("Cannot remove data directory: " + data_dir);
917     }
918   }
919 
920   /**
921    *
922    * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
923    * @return the block report for the specified data node
924    */
925   public Block[] getBlockReport(int dataNodeIndex) {
926     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
927       throw new IndexOutOfBoundsException();
928     }
929     return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport();
930   }
931 
932 
933   /**
934    *
935    * @return block reports from all data nodes
936    *    Block[] is indexed in the same order as the list of datanodes returned by getDataNodes()
937    */
938   public Block[][] getAllBlockReports() {
939     int numDataNodes = dataNodes.size();
940     Block[][] result = new Block[numDataNodes][];
941     for (int i = 0; i < numDataNodes; ++i) {
942      result[i] = getBlockReport(i);
943     }
944     return result;
945   }
946 
947 
948   /**
949    * This method is valid only if the data nodes have simulated data
950    * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes()
951    * @param blocksToInject - the blocks
952    * @throws IOException
953    *              if not simulatedFSDataset
954    *             if any of blocks already exist in the data node
955    *
956    */
957   public void injectBlocks(int dataNodeIndex, Block[] blocksToInject) throws IOException {
958     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
959       throw new IndexOutOfBoundsException();
960     }
961     FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
962     if (!(dataSet instanceof SimulatedFSDataset)) {
963       throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
964     }
965     SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
966     sdataset.injectBlocks(blocksToInject);
967     dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
968   }
969 
970   /**
971    * This method is valid only if the data nodes have simulated data
972    * @param blocksToInject - blocksToInject[] is indexed in the same order as the list
973    *             of datanodes returned by getDataNodes()
974    * @throws IOException
975    *             if not simulatedFSDataset
976    *             if any of blocks already exist in the data nodes
977    *             Note the rest of the blocks are not injected.
978    */
979   public void injectBlocks(Block[][] blocksToInject) throws IOException {
980     if (blocksToInject.length >  dataNodes.size()) {
981       throw new IndexOutOfBoundsException();
982     }
983     for (int i = 0; i < blocksToInject.length; ++i) {
984      injectBlocks(i, blocksToInject[i]);
985     }
986   }
987 
988   /**
989    * Set the softLimit and hardLimit of client lease periods
990    */
991   void setLeasePeriod(long soft, long hard) {
992     nameNode.getNamesystem().leaseManager.setLeasePeriod(soft, hard);
993     nameNode.getNamesystem().lmthread.interrupt();
994   }
995 
996   /**
997    * Returns the current set of datanodes
998    */
999   DataNode[] listDataNodes() {
1000     DataNode[] list = new DataNode[dataNodes.size()];
1001     for (int i = 0; i < dataNodes.size(); i++) {
1002       list[i] = dataNodes.get(i).datanode;
1003     }
1004     return list;
1005   }
1006 
1007   /**
1008    * Access to the data directory used for Datanodes
1009    * @throws IOException
1010    */
1011   public String getDataDirectory() {
1012     return data_dir.getAbsolutePath();
1013   }
1014 
1015   public static File getBaseDir() {
1016     return new File(System.getProperty(
1017       "test.build.data", "build/test/data"), "dfs/");
1018   }
1019 }
1020