1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import java.io.File; 21 import java.io.IOException; 22 import java.io.RandomAccessFile; 23 import java.net.InetSocketAddress; 24 import java.net.URI; 25 import java.net.URISyntaxException; 26 import java.nio.channels.FileChannel; 27 import java.security.PrivilegedExceptionAction; 28 import java.util.ArrayList; 29 import java.util.Collection; 30 import java.util.List; 31 import java.util.Random; 32 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.FileSystem; 35 import org.apache.hadoop.fs.FileUtil; 36 import org.apache.hadoop.hdfs.protocol.Block; 37 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 38 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; 39 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; 40 import org.apache.hadoop.hdfs.server.datanode.DataNode; 41 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; 42 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; 43 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 44 import org.apache.hadoop.hdfs.server.namenode.NameNode; 45 import org.apache.hadoop.hdfs.tools.DFSAdmin; 46 import org.apache.hadoop.net.DNSToSwitchMapping; 47 import org.apache.hadoop.net.NetUtils; 48 import org.apache.hadoop.net.StaticMapping; 49 import org.apache.hadoop.security.UserGroupInformation; 50 import org.apache.hadoop.util.StringUtils; 51 import org.apache.hadoop.util.ToolRunner; 52 53 /** 54 * This class creates a single-process DFS cluster for junit testing. 55 * The data directories for non-simulated DFS are under the testing directory. 56 * For simulated data nodes, no underlying fs storage is used. 57 */ 58 public class MiniDFSCluster { 59 60 public class DataNodeProperties { 61 DataNode datanode; 62 Configuration conf; 63 String[] dnArgs; 64 DataNodeProperties(DataNode node, Configuration conf, String[] args)65 DataNodeProperties(DataNode node, Configuration conf, String[] args) { 66 this.datanode = node; 67 this.conf = conf; 68 this.dnArgs = args; 69 } 70 } 71 72 private Configuration conf; 73 protected NameNode nameNode; 74 protected int numDataNodes; 75 protected List<DataNodeProperties> dataNodes = 76 new ArrayList<DataNodeProperties>(); 77 private File base_dir; 78 protected File data_dir; 79 80 /** 81 * This null constructor is used only when wishing to start a data node cluster 82 * without a name node (ie when the name node is started elsewhere). 83 */ MiniDFSCluster()84 public MiniDFSCluster() { 85 } 86 87 /** 88 * Modify the config and start up the servers with the given operation. 89 * Servers will be started on free ports. 90 * <p> 91 * The caller must manage the creation of NameNode and DataNode directories 92 * and have already set dfs.name.dir and dfs.data.dir in the given conf. 93 * 94 * @param conf the base configuration to use in starting the servers. This 95 * will be modified as necessary. 96 * @param numDataNodes Number of DataNodes to start; may be zero 97 * @param nameNodeOperation the operation with which to start the servers. If null 98 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. 99 */ MiniDFSCluster(Configuration conf, int numDataNodes, StartupOption nameNodeOperation)100 public MiniDFSCluster(Configuration conf, 101 int numDataNodes, 102 StartupOption nameNodeOperation) throws IOException { 103 this(0, conf, numDataNodes, false, false, false, nameNodeOperation, 104 null, null, null); 105 } 106 107 /** 108 * Modify the config and start up the servers. The rpc and info ports for 109 * servers are guaranteed to use free ports. 110 * <p> 111 * NameNode and DataNode directory creation and configuration will be 112 * managed by this class. 113 * 114 * @param conf the base configuration to use in starting the servers. This 115 * will be modified as necessary. 116 * @param numDataNodes Number of DataNodes to start; may be zero 117 * @param format if true, format the NameNode and DataNodes before starting up 118 * @param racks array of strings indicating the rack that each DataNode is on 119 */ MiniDFSCluster(Configuration conf, int numDataNodes, boolean format, String[] racks)120 public MiniDFSCluster(Configuration conf, 121 int numDataNodes, 122 boolean format, 123 String[] racks) throws IOException { 124 this(0, conf, numDataNodes, format, true, true, null, racks, null, null); 125 } 126 127 /** 128 * Modify the config and start up the servers. The rpc and info ports for 129 * servers are guaranteed to use free ports. 130 * <p> 131 * NameNode and DataNode directory creation and configuration will be 132 * managed by this class. 133 * 134 * @param conf the base configuration to use in starting the servers. This 135 * will be modified as necessary. 136 * @param numDataNodes Number of DataNodes to start; may be zero 137 * @param format if true, format the NameNode and DataNodes before starting up 138 * @param racks array of strings indicating the rack that each DataNode is on 139 * @param hosts array of strings indicating the hostname for each DataNode 140 */ MiniDFSCluster(Configuration conf, int numDataNodes, boolean format, String[] racks, String[] hosts)141 public MiniDFSCluster(Configuration conf, 142 int numDataNodes, 143 boolean format, 144 String[] racks, String[] hosts) throws IOException { 145 this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null); 146 } 147 148 /** 149 * NOTE: if possible, the other constructors that don't have nameNode port 150 * parameter should be used as they will ensure that the servers use free ports. 151 * <p> 152 * Modify the config and start up the servers. 153 * 154 * @param nameNodePort suggestion for which rpc port to use. caller should 155 * use getNameNodePort() to get the actual port used. 156 * @param conf the base configuration to use in starting the servers. This 157 * will be modified as necessary. 158 * @param numDataNodes Number of DataNodes to start; may be zero 159 * @param format if true, format the NameNode and DataNodes before starting up 160 * @param manageDfsDirs if true, the data directories for servers will be 161 * created and dfs.name.dir and dfs.data.dir will be set in the conf 162 * @param operation the operation with which to start the servers. If null 163 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. 164 * @param racks array of strings indicating the rack that each DataNode is on 165 */ MiniDFSCluster(int nameNodePort, Configuration conf, int numDataNodes, boolean format, boolean manageDfsDirs, StartupOption operation, String[] racks)166 public MiniDFSCluster(int nameNodePort, 167 Configuration conf, 168 int numDataNodes, 169 boolean format, 170 boolean manageDfsDirs, 171 StartupOption operation, 172 String[] racks) throws IOException { 173 this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, 174 operation, racks, null, null); 175 } 176 177 /** 178 * NOTE: if possible, the other constructors that don't have nameNode port 179 * parameter should be used as they will ensure that the servers use free ports. 180 * <p> 181 * Modify the config and start up the servers. 182 * 183 * @param nameNodePort suggestion for which rpc port to use. caller should 184 * use getNameNodePort() to get the actual port used. 185 * @param conf the base configuration to use in starting the servers. This 186 * will be modified as necessary. 187 * @param numDataNodes Number of DataNodes to start; may be zero 188 * @param format if true, format the NameNode and DataNodes before starting up 189 * @param manageDfsDirs if true, the data directories for servers will be 190 * created and dfs.name.dir and dfs.data.dir will be set in the conf 191 * @param operation the operation with which to start the servers. If null 192 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. 193 * @param racks array of strings indicating the rack that each DataNode is on 194 * @param simulatedCapacities array of capacities of the simulated data nodes 195 */ MiniDFSCluster(int nameNodePort, Configuration conf, int numDataNodes, boolean format, boolean manageDfsDirs, StartupOption operation, String[] racks, long[] simulatedCapacities)196 public MiniDFSCluster(int nameNodePort, 197 Configuration conf, 198 int numDataNodes, 199 boolean format, 200 boolean manageDfsDirs, 201 StartupOption operation, 202 String[] racks, 203 long[] simulatedCapacities) throws IOException { 204 this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, 205 operation, racks, null, simulatedCapacities); 206 } 207 208 /** 209 * NOTE: if possible, the other constructors that don't have nameNode port 210 * parameter should be used as they will ensure that the servers use free ports. 211 * <p> 212 * Modify the config and start up the servers. 213 * 214 * @param nameNodePort suggestion for which rpc port to use. caller should 215 * use getNameNodePort() to get the actual port used. 216 * @param conf the base configuration to use in starting the servers. This 217 * will be modified as necessary. 218 * @param numDataNodes Number of DataNodes to start; may be zero 219 * @param format if true, format the NameNode and DataNodes before starting up 220 * @param manageNameDfsDirs if true, the data directories for servers will be 221 * created and dfs.name.dir and dfs.data.dir will be set in the conf 222 * @param manageDataDfsDirs if true, the data directories for datanodes will 223 * be created and dfs.data.dir set to same in the conf 224 * @param operation the operation with which to start the servers. If null 225 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. 226 * @param racks array of strings indicating the rack that each DataNode is on 227 * @param hosts array of strings indicating the hostnames of each DataNode 228 * @param simulatedCapacities array of capacities of the simulated data nodes 229 */ MiniDFSCluster(int nameNodePort, Configuration conf, int numDataNodes, boolean format, boolean manageNameDfsDirs, boolean manageDataDfsDirs, StartupOption operation, String[] racks, String hosts[], long[] simulatedCapacities)230 public MiniDFSCluster(int nameNodePort, 231 Configuration conf, 232 int numDataNodes, 233 boolean format, 234 boolean manageNameDfsDirs, 235 boolean manageDataDfsDirs, 236 StartupOption operation, 237 String[] racks, String hosts[], 238 long[] simulatedCapacities) throws IOException { 239 this.conf = conf; 240 base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/"); 241 data_dir = new File(base_dir, "data"); 242 243 // Setup the NameNode configuration 244 FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort)); 245 conf.set("dfs.http.address", "127.0.0.1:0"); 246 if (manageNameDfsDirs) { 247 conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+ 248 new File(base_dir, "name2").getPath()); 249 conf.set("fs.checkpoint.dir", new File(base_dir, "namesecondary1"). 250 getPath()+"," + new File(base_dir, "namesecondary2").getPath()); 251 } 252 253 int replication = conf.getInt("dfs.replication", 3); 254 conf.setInt("dfs.replication", Math.min(replication, numDataNodes)); 255 int safemodeExtension = conf.getInt("dfs.safemode.extension.testing", 0); 256 conf.setInt("dfs.safemode.extension", safemodeExtension); 257 conf.setInt("dfs.namenode.decommission.interval", 3); // 3 second 258 259 // Set a small delay on blockReceived in the minicluster to approximate 260 // a real cluster a little better and suss out bugs. 261 conf.setInt("dfs.datanode.artificialBlockReceivedDelay", 5); 262 263 // Format and clean out DataNode directories 264 if (format) { 265 if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) { 266 throw new IOException("Cannot remove data directory: " + data_dir); 267 } 268 NameNode.format(conf); 269 } 270 271 // Start the NameNode 272 String[] args = (operation == null || 273 operation == StartupOption.FORMAT || 274 operation == StartupOption.REGULAR) ? 275 new String[] {} : new String[] {operation.getName()}; 276 conf.setClass("topology.node.switch.mapping.impl", 277 StaticMapping.class, DNSToSwitchMapping.class); 278 nameNode = NameNode.createNameNode(args, conf); 279 280 if (operation == StartupOption.RECOVER) { 281 return; 282 } 283 // Start the DataNodes 284 startDataNodes(conf, numDataNodes, manageDataDfsDirs, 285 operation, racks, hosts, simulatedCapacities); 286 waitClusterUp(); 287 } 288 289 /** 290 * wait for the cluster to get out of 291 * safemode. 292 */ waitClusterUp()293 public void waitClusterUp() { 294 if (numDataNodes > 0) { 295 while (!isClusterUp()) { 296 try { 297 System.err.println("Waiting for the Mini HDFS Cluster to start..."); 298 Thread.sleep(1000); 299 } catch (InterruptedException e) { 300 } 301 } 302 } 303 } 304 305 /** 306 * Modify the config and start up additional DataNodes. The info port for 307 * DataNodes is guaranteed to use a free port. 308 * 309 * Data nodes can run with the name node in the mini cluster or 310 * a real name node. For example, running with a real name node is useful 311 * when running simulated data nodes with a real name node. 312 * If minicluster's name node is null assume that the conf has been 313 * set with the right address:port of the name node. 314 * 315 * @param conf the base configuration to use in starting the DataNodes. This 316 * will be modified as necessary. 317 * @param numDataNodes Number of DataNodes to start; may be zero 318 * @param manageDfsDirs if true, the data directories for DataNodes will be 319 * created and dfs.data.dir will be set in the conf 320 * @param operation the operation with which to start the DataNodes. If null 321 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. 322 * @param racks array of strings indicating the rack that each DataNode is on 323 * @param hosts array of strings indicating the hostnames for each DataNode 324 * @param simulatedCapacities array of capacities of the simulated data nodes 325 * 326 * @throws IllegalStateException if NameNode has been shutdown 327 */ startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks, String[] hosts, long[] simulatedCapacities)328 public synchronized void startDataNodes(Configuration conf, int numDataNodes, 329 boolean manageDfsDirs, StartupOption operation, 330 String[] racks, String[] hosts, 331 long[] simulatedCapacities) throws IOException { 332 conf.set("slave.host.name", "127.0.0.1"); 333 334 int curDatanodesNum = dataNodes.size(); 335 // for mincluster's the default initialDelay for BRs is 0 336 if (conf.get("dfs.blockreport.initialDelay") == null) { 337 conf.setLong("dfs.blockreport.initialDelay", 0); 338 } 339 // If minicluster's name node is null assume that the conf has been 340 // set with the right address:port of the name node. 341 // 342 if (nameNode != null) { // set conf from the name node 343 InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); 344 int nameNodePort = nnAddr.getPort(); 345 FileSystem.setDefaultUri(conf, 346 "hdfs://"+ nnAddr.getHostName() + 347 ":" + Integer.toString(nameNodePort)); 348 } 349 350 if (racks != null && numDataNodes > racks.length ) { 351 throw new IllegalArgumentException( "The length of racks [" + racks.length 352 + "] is less than the number of datanodes [" + numDataNodes + "]."); 353 } 354 if (hosts != null && numDataNodes > hosts.length ) { 355 throw new IllegalArgumentException( "The length of hosts [" + hosts.length 356 + "] is less than the number of datanodes [" + numDataNodes + "]."); 357 } 358 //Generate some hostnames if required 359 if (racks != null && hosts == null) { 360 System.out.println("Generating host names for datanodes"); 361 hosts = new String[numDataNodes]; 362 for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) { 363 hosts[i - curDatanodesNum] = "host" + i + ".foo.com"; 364 } 365 } 366 367 if (simulatedCapacities != null 368 && numDataNodes > simulatedCapacities.length) { 369 throw new IllegalArgumentException( "The length of simulatedCapacities [" 370 + simulatedCapacities.length 371 + "] is less than the number of datanodes [" + numDataNodes + "]."); 372 } 373 374 // Set up the right ports for the datanodes 375 conf.set("dfs.datanode.address", "127.0.0.1:0"); 376 conf.set("dfs.datanode.http.address", "127.0.0.1:0"); 377 conf.set("dfs.datanode.ipc.address", "127.0.0.1:0"); 378 379 380 String [] dnArgs = (operation == null || 381 operation != StartupOption.ROLLBACK) ? 382 null : new String[] {operation.getName()}; 383 384 385 for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { 386 Configuration dnConf = new Configuration(conf); 387 if (manageDfsDirs) { 388 File dir1 = new File(data_dir, "data"+(2*i+1)); 389 File dir2 = new File(data_dir, "data"+(2*i+2)); 390 dir1.mkdirs(); 391 dir2.mkdirs(); 392 if (!dir1.isDirectory() || !dir2.isDirectory()) { 393 throw new IOException("Mkdirs failed to create directory for DataNode " 394 + i + ": " + dir1 + " or " + dir2); 395 } 396 dnConf.set(DataNode.DATA_DIR_KEY, dir1.getPath() + "," + dir2.getPath()); 397 } 398 if (simulatedCapacities != null) { 399 dnConf.setBoolean("dfs.datanode.simulateddatastorage", true); 400 dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, 401 simulatedCapacities[i-curDatanodesNum]); 402 } 403 System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 404 + dnConf.get("dfs.data.dir")); 405 if (hosts != null) { 406 dnConf.set("slave.host.name", hosts[i - curDatanodesNum]); 407 System.out.println("Starting DataNode " + i + " with hostname set to: " 408 + dnConf.get("slave.host.name")); 409 } 410 if (racks != null) { 411 String name = hosts[i - curDatanodesNum]; 412 System.out.println("Adding node with hostname : " + name + " to rack "+ 413 racks[i-curDatanodesNum]); 414 StaticMapping.addNodeToRack(name, 415 racks[i-curDatanodesNum]); 416 } 417 Configuration newconf = new Configuration(dnConf); // save config 418 if (hosts != null) { 419 NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost"); 420 } 421 DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf); 422 //NOTE: the following is true if and only if: 423 // hadoop.security.token.service.use_ip=true 424 //since the HDFS does things based on IP:port, we need to add the mapping 425 //for IP:port to rackId 426 String ipAddr = dn.getSelfAddr().getAddress().getHostAddress(); 427 if (racks != null) { 428 int port = dn.getSelfAddr().getPort(); 429 System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+ 430 " to rack " + racks[i-curDatanodesNum]); 431 StaticMapping.addNodeToRack(ipAddr + ":" + port, 432 racks[i-curDatanodesNum]); 433 } 434 DataNode.runDatanodeDaemon(dn); 435 dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs)); 436 } 437 curDatanodesNum += numDataNodes; 438 this.numDataNodes += numDataNodes; 439 waitActive(); 440 } 441 442 /** 443 * Modify the config and start up the DataNodes. The info port for 444 * DataNodes is guaranteed to use a free port. 445 * 446 * @param conf the base configuration to use in starting the DataNodes. This 447 * will be modified as necessary. 448 * @param numDataNodes Number of DataNodes to start; may be zero 449 * @param manageDfsDirs if true, the data directories for DataNodes will be 450 * created and dfs.data.dir will be set in the conf 451 * @param operation the operation with which to start the DataNodes. If null 452 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. 453 * @param racks array of strings indicating the rack that each DataNode is on 454 * 455 * @throws IllegalStateException if NameNode has been shutdown 456 */ 457 startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks )458 public void startDataNodes(Configuration conf, int numDataNodes, 459 boolean manageDfsDirs, StartupOption operation, 460 String[] racks 461 ) throws IOException { 462 startDataNodes( conf, numDataNodes, manageDfsDirs, operation, racks, null, null); 463 } 464 465 /** 466 * Modify the config and start up additional DataNodes. The info port for 467 * DataNodes is guaranteed to use a free port. 468 * 469 * Data nodes can run with the name node in the mini cluster or 470 * a real name node. For example, running with a real name node is useful 471 * when running simulated data nodes with a real name node. 472 * If minicluster's name node is null assume that the conf has been 473 * set with the right address:port of the name node. 474 * 475 * @param conf the base configuration to use in starting the DataNodes. This 476 * will be modified as necessary. 477 * @param numDataNodes Number of DataNodes to start; may be zero 478 * @param manageDfsDirs if true, the data directories for DataNodes will be 479 * created and dfs.data.dir will be set in the conf 480 * @param operation the operation with which to start the DataNodes. If null 481 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used. 482 * @param racks array of strings indicating the rack that each DataNode is on 483 * @param simulatedCapacities array of capacities of the simulated data nodes 484 * 485 * @throws IllegalStateException if NameNode has been shutdown 486 */ startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks, long[] simulatedCapacities)487 public void startDataNodes(Configuration conf, int numDataNodes, 488 boolean manageDfsDirs, StartupOption operation, 489 String[] racks, 490 long[] simulatedCapacities) throws IOException { 491 startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null, 492 simulatedCapacities); 493 494 } 495 /** 496 * If the NameNode is running, attempt to finalize a previous upgrade. 497 * When this method return, the NameNode should be finalized, but 498 * DataNodes may not be since that occurs asynchronously. 499 * 500 * @throws IllegalStateException if the Namenode is not running. 501 */ finalizeCluster(Configuration conf)502 public void finalizeCluster(Configuration conf) throws Exception { 503 if (nameNode == null) { 504 throw new IllegalStateException("Attempting to finalize " 505 + "Namenode but it is not running"); 506 } 507 ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"}); 508 } 509 510 /** 511 * Gets the started NameNode. May be null. 512 */ getNameNode()513 public NameNode getNameNode() { 514 return nameNode; 515 } 516 517 /** 518 * Gets a list of the started DataNodes. May be empty. 519 */ getDataNodes()520 public ArrayList<DataNode> getDataNodes() { 521 ArrayList<DataNode> list = new ArrayList<DataNode>(); 522 for (int i = 0; i < dataNodes.size(); i++) { 523 DataNode node = dataNodes.get(i).datanode; 524 list.add(node); 525 } 526 return list; 527 } 528 529 /** @return the datanode having the ipc server listen port */ getDataNode(int ipcPort)530 public DataNode getDataNode(int ipcPort) { 531 for(DataNode dn : getDataNodes()) { 532 if (dn.ipcServer.getListenerAddress().getPort() == ipcPort) { 533 return dn; 534 } 535 } 536 return null; 537 } 538 539 /** 540 * Gets the rpc port used by the NameNode, because the caller 541 * supplied port is not necessarily the actual port used. 542 */ getNameNodePort()543 public int getNameNodePort() { 544 return nameNode.getNameNodeAddress().getPort(); 545 } 546 547 /** 548 * Shut down the servers that are up. 549 */ shutdown()550 public void shutdown() { 551 System.out.println("Shutting down the Mini HDFS Cluster"); 552 shutdownDataNodes(); 553 if (nameNode != null) { 554 nameNode.stop(); 555 nameNode.join(); 556 nameNode = null; 557 } 558 } 559 560 /** 561 * Shutdown all DataNodes started by this class. The NameNode 562 * is left running so that new DataNodes may be started. 563 */ shutdownDataNodes()564 public void shutdownDataNodes() { 565 for (int i = dataNodes.size()-1; i >= 0; i--) { 566 System.out.println("Shutting down DataNode " + i); 567 DataNode dn = dataNodes.remove(i).datanode; 568 dn.shutdown(); 569 numDataNodes--; 570 } 571 } 572 573 /** 574 * Shutdown namenode. 575 */ shutdownNameNode()576 public synchronized void shutdownNameNode() { 577 if (nameNode != null) { 578 System.out.println("Shutting down the namenode"); 579 nameNode.stop(); 580 nameNode.join(); 581 nameNode = null; 582 } 583 } 584 585 /** Same as restartNameNode(true, true). */ restartNameNode()586 public synchronized void restartNameNode() throws IOException { 587 restartNameNode(true, true); 588 } 589 590 /** Same as restartNameNode(waitSafemodeExit, true). */ restartNameNode(boolean waitSafemodeExit )591 public synchronized void restartNameNode(boolean waitSafemodeExit 592 ) throws IOException { 593 restartNameNode(waitSafemodeExit, true); 594 } 595 596 /** 597 * Restart namenode. 598 * 599 * @param waitSafemodeExit Should it wait for safe mode to turn off? 600 * @param waitClusterActive Should it wait for cluster to be active? 601 * @throws IOException 602 */ restartNameNode(boolean waitSafemodeExit, boolean waitClusterActive)603 public synchronized void restartNameNode(boolean waitSafemodeExit, 604 boolean waitClusterActive) throws IOException { 605 shutdownNameNode(); 606 nameNode = NameNode.createNameNode(new String[] {}, conf); 607 if (waitSafemodeExit) { 608 waitClusterUp(); 609 } 610 System.out.println("Restarted the namenode"); 611 612 int failedCount = 0; 613 while(waitClusterActive) { 614 try { 615 waitActive(); 616 break; 617 } catch (IOException e) { 618 failedCount++; 619 // Cached RPC connection to namenode, if any, is expected to fail once 620 if (failedCount > 1) { 621 System.out.println("Tried waitActive() " + failedCount 622 + " time(s) and failed, giving up. " 623 + StringUtils.stringifyException(e)); 624 throw e; 625 } 626 } 627 } 628 } 629 630 /* 631 * Corrupt a block on all datanode 632 */ corruptBlockOnDataNodes(String blockName)633 void corruptBlockOnDataNodes(String blockName) throws Exception{ 634 for (int i=0; i < dataNodes.size(); i++) 635 corruptBlockOnDataNode(i,blockName); 636 } 637 638 /* 639 * Corrupt a block on a particular datanode 640 */ corruptBlockOnDataNode(int i, String blockName)641 boolean corruptBlockOnDataNode(int i, String blockName) throws Exception { 642 Random random = new Random(); 643 boolean corrupted = false; 644 File dataDir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/data"); 645 if (i < 0 || i >= dataNodes.size()) 646 return false; 647 for (int dn = i*2; dn < i*2+2; dn++) { 648 File blockFile = new File(dataDir, "data" + (dn+1) + "/current/" + 649 blockName); 650 System.out.println("Corrupting for: " + blockFile); 651 if (blockFile.exists()) { 652 // Corrupt replica by writing random bytes into replica 653 RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); 654 FileChannel channel = raFile.getChannel(); 655 String badString = "BADBAD"; 656 int rand = random.nextInt((int)channel.size()/2); 657 raFile.seek(rand); 658 raFile.write(badString.getBytes()); 659 raFile.close(); 660 } 661 corrupted = true; 662 } 663 return corrupted; 664 } 665 666 /* 667 * Shutdown a particular datanode 668 */ stopDataNode(int i)669 public synchronized DataNodeProperties stopDataNode(int i) { 670 if (i < 0 || i >= dataNodes.size()) { 671 return null; 672 } 673 DataNodeProperties dnprop = dataNodes.remove(i); 674 DataNode dn = dnprop.datanode; 675 System.out.println("MiniDFSCluster Stopping DataNode " + 676 dn.dnRegistration.getName() + 677 " from a total of " + (dataNodes.size() + 1) + 678 " datanodes."); 679 dn.shutdown(); 680 numDataNodes--; 681 return dnprop; 682 } 683 684 /* 685 * Shutdown a datanode by name. 686 */ stopDataNode(String name)687 public synchronized DataNodeProperties stopDataNode(String name) { 688 int i; 689 for (i = 0; i < dataNodes.size(); i++) { 690 DataNode dn = dataNodes.get(i).datanode; 691 if (dn.dnRegistration.getName().equals(name)) { 692 break; 693 } 694 } 695 return stopDataNode(i); 696 } 697 698 /** 699 * Restart a datanode 700 * @param dnprop datanode's property 701 * @return true if restarting is successful 702 * @throws IOException 703 */ restartDataNode(DataNodeProperties dnprop)704 public boolean restartDataNode(DataNodeProperties dnprop) throws IOException { 705 return restartDataNode(dnprop, false); 706 } 707 708 /** 709 * Restart a datanode, on the same port if requested 710 * @param dnprop, the datanode to restart 711 * @param keepPort, whether to use the same port 712 * @return true if restarting is successful 713 * @throws IOException 714 */ restartDataNode(DataNodeProperties dnprop, boolean keepPort)715 public synchronized boolean restartDataNode(DataNodeProperties dnprop, 716 boolean keepPort) throws IOException { 717 Configuration conf = dnprop.conf; 718 String[] args = dnprop.dnArgs; 719 Configuration newconf = new Configuration(conf); // save cloned config 720 if (keepPort) { 721 InetSocketAddress addr = dnprop.datanode.getSelfAddr(); 722 conf.set("dfs.datanode.address", addr.getAddress().getHostAddress() + ":" 723 + addr.getPort()); 724 } 725 dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf), 726 newconf, args)); 727 numDataNodes++; 728 return true; 729 } 730 731 /* 732 * Restart a particular datanode, use newly assigned port 733 */ restartDataNode(int i)734 public boolean restartDataNode(int i) throws IOException { 735 return restartDataNode(i, false); 736 } 737 738 /* 739 * Restart a particular datanode, on the same port if keepPort is true 740 */ restartDataNode(int i, boolean keepPort)741 public synchronized boolean restartDataNode(int i, boolean keepPort) 742 throws IOException { 743 DataNodeProperties dnprop = stopDataNode(i); 744 if (dnprop == null) { 745 return false; 746 } else { 747 return restartDataNode(dnprop, keepPort); 748 } 749 } 750 751 /* 752 * Restart all datanodes, on the same ports if keepPort is true 753 */ restartDataNodes(boolean keepPort)754 public synchronized boolean restartDataNodes(boolean keepPort) 755 throws IOException { 756 for (int i = dataNodes.size() - 1; i >= 0; i--) { 757 if (!restartDataNode(i, keepPort)) 758 return false; 759 System.out.println("Restarted DataNode " + i); 760 } 761 return true; 762 } 763 764 /* 765 * Restart all datanodes, use newly assigned ports 766 */ restartDataNodes()767 public boolean restartDataNodes() throws IOException { 768 return restartDataNodes(false); 769 } 770 771 /** 772 * Returns true if the NameNode is running and is out of Safe Mode. 773 */ isClusterUp()774 public boolean isClusterUp() { 775 if (nameNode == null) { 776 return false; 777 } 778 try { 779 long[] sizes = nameNode.getStats(); 780 boolean isUp = false; 781 synchronized (this) { 782 isUp = (!nameNode.isInSafeMode() && sizes[0] != 0); 783 } 784 return isUp; 785 } catch (IOException ie) { 786 return false; 787 } 788 } 789 790 /** 791 * Returns true if there is at least one DataNode running. 792 */ isDataNodeUp()793 public boolean isDataNodeUp() { 794 if (dataNodes == null || dataNodes.size() == 0) { 795 return false; 796 } 797 return true; 798 } 799 800 /** 801 * Get a client handle to the DFS cluster. 802 */ getFileSystem()803 public FileSystem getFileSystem() throws IOException { 804 return FileSystem.get(conf); 805 } 806 807 /** 808 * @return a {@link HftpFileSystem} object. 809 */ getHftpFileSystem()810 public HftpFileSystem getHftpFileSystem() throws IOException { 811 final String str = "hftp://" + conf.get("dfs.http.address"); 812 try { 813 return (HftpFileSystem)FileSystem.get(new URI(str), conf); 814 } catch (URISyntaxException e) { 815 throw new IOException(e); 816 } 817 } 818 819 /** 820 * @return a {@link HftpFileSystem} object as specified user. 821 */ getHftpFileSystemAs(final String username, final Configuration conf, final String... groups )822 public HftpFileSystem getHftpFileSystemAs(final String username, 823 final Configuration conf, final String... groups 824 ) throws IOException, InterruptedException { 825 final UserGroupInformation ugi = UserGroupInformation.createUserForTesting( 826 username, groups); 827 return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() { 828 @Override 829 public HftpFileSystem run() throws Exception { 830 return getHftpFileSystem(); 831 } 832 }); 833 } 834 835 /** 836 * Get the directories where the namenode stores its image. 837 */ 838 public Collection<File> getNameDirs() { 839 return FSNamesystem.getNamespaceDirs(conf); 840 } 841 842 /** 843 * Get the directories where the namenode stores its edits. 844 */ 845 public Collection<File> getNameEditsDirs() { 846 return FSNamesystem.getNamespaceEditsDirs(conf); 847 } 848 849 /** 850 * Wait until the cluster is active and running. 851 */ 852 public void waitActive() throws IOException { 853 if (nameNode == null) { 854 return; 855 } 856 InetSocketAddress addr = NetUtils.makeSocketAddr("localhost", 857 getNameNodePort()); 858 DFSClient client = new DFSClient(addr, conf); 859 860 // make sure all datanodes have registered and sent heartbeat 861 while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) { 862 try { 863 Thread.sleep(100); 864 } catch (InterruptedException e) { 865 } 866 } 867 868 client.close(); 869 System.out.println("Cluster is active"); 870 } 871 872 private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) { 873 if (dnInfo.length != numDataNodes) { 874 return true; 875 } 876 // make sure all datanodes have sent first heartbeat to namenode, 877 // using (capacity == 0) as proxy. 878 for (DatanodeInfo dn : dnInfo) { 879 if (dn.getCapacity() == 0) { 880 return true; 881 } 882 } 883 return false; 884 } 885 886 /** 887 * Wait for the given datanode to heartbeat once. 888 */ 889 public void waitForDNHeartbeat(int dnIndex, long timeoutMillis) 890 throws IOException, InterruptedException { 891 DataNode dn = getDataNodes().get(dnIndex); 892 InetSocketAddress addr = new InetSocketAddress("localhost", 893 getNameNodePort()); 894 DFSClient client = new DFSClient(addr, conf); 895 896 long startTime = System.currentTimeMillis(); 897 while (System.currentTimeMillis() < startTime + timeoutMillis) { 898 DatanodeInfo report[] = client.datanodeReport(DatanodeReportType.LIVE); 899 900 for (DatanodeInfo thisReport : report) { 901 if (thisReport.getStorageID().equals( 902 dn.dnRegistration.getStorageID())) { 903 if (thisReport.getLastUpdate() > startTime) 904 return; 905 } 906 } 907 908 Thread.sleep(500); 909 } 910 } 911 912 public void formatDataNodeDirs() throws IOException { 913 base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/"); 914 data_dir = new File(base_dir, "data"); 915 if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) { 916 throw new IOException("Cannot remove data directory: " + data_dir); 917 } 918 } 919 920 /** 921 * 922 * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes() 923 * @return the block report for the specified data node 924 */ 925 public Block[] getBlockReport(int dataNodeIndex) { 926 if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { 927 throw new IndexOutOfBoundsException(); 928 } 929 return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(); 930 } 931 932 933 /** 934 * 935 * @return block reports from all data nodes 936 * Block[] is indexed in the same order as the list of datanodes returned by getDataNodes() 937 */ 938 public Block[][] getAllBlockReports() { 939 int numDataNodes = dataNodes.size(); 940 Block[][] result = new Block[numDataNodes][]; 941 for (int i = 0; i < numDataNodes; ++i) { 942 result[i] = getBlockReport(i); 943 } 944 return result; 945 } 946 947 948 /** 949 * This method is valid only if the data nodes have simulated data 950 * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() 951 * @param blocksToInject - the blocks 952 * @throws IOException 953 * if not simulatedFSDataset 954 * if any of blocks already exist in the data node 955 * 956 */ 957 public void injectBlocks(int dataNodeIndex, Block[] blocksToInject) throws IOException { 958 if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { 959 throw new IndexOutOfBoundsException(); 960 } 961 FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); 962 if (!(dataSet instanceof SimulatedFSDataset)) { 963 throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); 964 } 965 SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; 966 sdataset.injectBlocks(blocksToInject); 967 dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0); 968 } 969 970 /** 971 * This method is valid only if the data nodes have simulated data 972 * @param blocksToInject - blocksToInject[] is indexed in the same order as the list 973 * of datanodes returned by getDataNodes() 974 * @throws IOException 975 * if not simulatedFSDataset 976 * if any of blocks already exist in the data nodes 977 * Note the rest of the blocks are not injected. 978 */ 979 public void injectBlocks(Block[][] blocksToInject) throws IOException { 980 if (blocksToInject.length > dataNodes.size()) { 981 throw new IndexOutOfBoundsException(); 982 } 983 for (int i = 0; i < blocksToInject.length; ++i) { 984 injectBlocks(i, blocksToInject[i]); 985 } 986 } 987 988 /** 989 * Set the softLimit and hardLimit of client lease periods 990 */ 991 void setLeasePeriod(long soft, long hard) { 992 nameNode.getNamesystem().leaseManager.setLeasePeriod(soft, hard); 993 nameNode.getNamesystem().lmthread.interrupt(); 994 } 995 996 /** 997 * Returns the current set of datanodes 998 */ 999 DataNode[] listDataNodes() { 1000 DataNode[] list = new DataNode[dataNodes.size()]; 1001 for (int i = 0; i < dataNodes.size(); i++) { 1002 list[i] = dataNodes.get(i).datanode; 1003 } 1004 return list; 1005 } 1006 1007 /** 1008 * Access to the data directory used for Datanodes 1009 * @throws IOException 1010 */ 1011 public String getDataDirectory() { 1012 return data_dir.getAbsolutePath(); 1013 } 1014 1015 public static File getBaseDir() { 1016 return new File(System.getProperty( 1017 "test.build.data", "build/test/data"), "dfs/"); 1018 } 1019 } 1020