1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.net.InetAddress; 24 import java.net.UnknownHostException; 25 import java.util.Collection; 26 import java.util.concurrent.ConcurrentHashMap; 27 import java.util.concurrent.ConcurrentMap; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.classification.InterfaceAudience; 32 import org.apache.hadoop.classification.InterfaceStability; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.FileContext; 35 import org.apache.hadoop.fs.Path; 36 import org.apache.hadoop.ha.HAServiceProtocol; 37 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; 38 import org.apache.hadoop.service.AbstractService; 39 import org.apache.hadoop.service.CompositeService; 40 import org.apache.hadoop.util.Shell; 41 import org.apache.hadoop.util.Shell.ShellCommandExecutor; 42 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; 43 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 44 import org.apache.hadoop.yarn.conf.HAUtil; 45 import org.apache.hadoop.yarn.conf.YarnConfiguration; 46 import org.apache.hadoop.yarn.event.Dispatcher; 47 import org.apache.hadoop.yarn.event.EventHandler; 48 import org.apache.hadoop.yarn.exceptions.YarnException; 49 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 50 import org.apache.hadoop.yarn.factories.RecordFactory; 51 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 52 import org.apache.hadoop.yarn.server.api.ResourceTracker; 53 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; 54 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; 55 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; 56 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; 57 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; 58 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; 59 import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; 60 import org.apache.hadoop.yarn.server.nodemanager.Context; 61 import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; 62 import org.apache.hadoop.yarn.server.nodemanager.NodeManager; 63 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; 64 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; 65 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; 66 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; 67 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; 68 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; 69 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; 70 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; 71 import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; 72 import org.apache.hadoop.yarn.server.timeline.TimelineStore; 73 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; 74 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; 75 import org.apache.hadoop.yarn.webapp.util.WebAppUtils; 76 77 import com.google.common.annotations.VisibleForTesting; 78 79 /** 80 * Embedded Yarn minicluster for testcases that need to interact with a cluster. 81 * <p/> 82 * In a real cluster, resource request matching is done using the hostname, and 83 * by default Yarn minicluster works in the exact same way as a real cluster. 84 * <p/> 85 * If a testcase needs to use multiple nodes and exercise resource request 86 * matching to a specific node, then the property 87 * {@YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} should be set 88 * <code>true</code> in the configuration used to initialize the minicluster. 89 * <p/> 90 * With this property set to <code>true</code>, the matching will be done using 91 * the <code>hostname:port</code> of the namenodes. In such case, the AM must 92 * do resource request using <code>hostname:port</code> as the location. 93 */ 94 @InterfaceAudience.Public 95 @InterfaceStability.Evolving 96 public class MiniYARNCluster extends CompositeService { 97 98 private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class); 99 100 // temp fix until metrics system can auto-detect itself running in unit test: 101 static { 102 DefaultMetricsSystem.setMiniClusterMode(true); 103 } 104 105 private NodeManager[] nodeManagers; 106 private ResourceManager[] resourceManagers; 107 private String[] rmIds; 108 109 private ApplicationHistoryServer appHistoryServer; 110 111 private boolean useFixedPorts; 112 private boolean useRpc = false; 113 private int failoverTimeout; 114 115 private ConcurrentMap<ApplicationAttemptId, Long> appMasters = 116 new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2); 117 118 private File testWorkDir; 119 120 // Number of nm-local-dirs per nodemanager 121 private int numLocalDirs; 122 // Number of nm-log-dirs per nodemanager 123 private int numLogDirs; 124 private boolean enableAHS; 125 126 /** 127 * @param testName name of the test 128 * @param numResourceManagers the number of resource managers in the cluster 129 * @param numNodeManagers the number of node managers in the cluster 130 * @param numLocalDirs the number of nm-local-dirs per nodemanager 131 * @param numLogDirs the number of nm-log-dirs per nodemanager 132 * @param enableAHS enable ApplicationHistoryServer or not 133 */ MiniYARNCluster( String testName, int numResourceManagers, int numNodeManagers, int numLocalDirs, int numLogDirs, boolean enableAHS)134 public MiniYARNCluster( 135 String testName, int numResourceManagers, int numNodeManagers, 136 int numLocalDirs, int numLogDirs, boolean enableAHS) { 137 super(testName.replace("$", "")); 138 this.numLocalDirs = numLocalDirs; 139 this.numLogDirs = numLogDirs; 140 this.enableAHS = enableAHS; 141 String testSubDir = testName.replace("$", ""); 142 File targetWorkDir = new File("target", testSubDir); 143 try { 144 FileContext.getLocalFSFileContext().delete( 145 new Path(targetWorkDir.getAbsolutePath()), true); 146 } catch (Exception e) { 147 LOG.warn("COULD NOT CLEANUP", e); 148 throw new YarnRuntimeException("could not cleanup test dir: "+ e, e); 149 } 150 151 if (Shell.WINDOWS) { 152 // The test working directory can exceed the maximum path length supported 153 // by some Windows APIs and cmd.exe (260 characters). To work around this, 154 // create a symlink in temporary storage with a much shorter path, 155 // targeting the full path to the test working directory. Then, use the 156 // symlink as the test working directory. 157 String targetPath = targetWorkDir.getAbsolutePath(); 158 File link = new File(System.getProperty("java.io.tmpdir"), 159 String.valueOf(System.currentTimeMillis())); 160 String linkPath = link.getAbsolutePath(); 161 162 try { 163 FileContext.getLocalFSFileContext().delete(new Path(linkPath), true); 164 } catch (IOException e) { 165 throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e); 166 } 167 168 // Guarantee target exists before creating symlink. 169 targetWorkDir.mkdirs(); 170 171 ShellCommandExecutor shexec = new ShellCommandExecutor( 172 Shell.getSymlinkCommand(targetPath, linkPath)); 173 try { 174 shexec.execute(); 175 } catch (IOException e) { 176 throw new YarnRuntimeException(String.format( 177 "failed to create symlink from %s to %s, shell output: %s", linkPath, 178 targetPath, shexec.getOutput()), e); 179 } 180 181 this.testWorkDir = link; 182 } else { 183 this.testWorkDir = targetWorkDir; 184 } 185 186 resourceManagers = new ResourceManager[numResourceManagers]; 187 nodeManagers = new NodeManager[numNodeManagers]; 188 } 189 190 /** 191 * @param testName name of the test 192 * @param numResourceManagers the number of resource managers in the cluster 193 * @param numNodeManagers the number of node managers in the cluster 194 * @param numLocalDirs the number of nm-local-dirs per nodemanager 195 * @param numLogDirs the number of nm-log-dirs per nodemanager 196 */ MiniYARNCluster( String testName, int numResourceManagers, int numNodeManagers, int numLocalDirs, int numLogDirs)197 public MiniYARNCluster( 198 String testName, int numResourceManagers, int numNodeManagers, 199 int numLocalDirs, int numLogDirs) { 200 this(testName, numResourceManagers, numNodeManagers, numLocalDirs, 201 numLogDirs, false); 202 } 203 204 /** 205 * @param testName name of the test 206 * @param numNodeManagers the number of node managers in the cluster 207 * @param numLocalDirs the number of nm-local-dirs per nodemanager 208 * @param numLogDirs the number of nm-log-dirs per nodemanager 209 */ MiniYARNCluster(String testName, int numNodeManagers, int numLocalDirs, int numLogDirs)210 public MiniYARNCluster(String testName, int numNodeManagers, 211 int numLocalDirs, int numLogDirs) { 212 this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs); 213 } 214 215 @Override serviceInit(Configuration conf)216 public void serviceInit(Configuration conf) throws Exception { 217 useFixedPorts = conf.getBoolean( 218 YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 219 YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS); 220 useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, 221 YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); 222 failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 223 YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); 224 225 if (useRpc && !useFixedPorts) { 226 throw new YarnRuntimeException("Invalid configuration!" + 227 " Minicluster can use rpc only when configured to use fixed ports"); 228 } 229 230 conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true); 231 if (resourceManagers.length > 1) { 232 conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); 233 if (conf.get(YarnConfiguration.RM_HA_IDS) == null) { 234 StringBuilder rmIds = new StringBuilder(); 235 for (int i = 0; i < resourceManagers.length; i++) { 236 if (i != 0) { 237 rmIds.append(","); 238 } 239 rmIds.append("rm" + i); 240 } 241 conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); 242 } 243 Collection<String> rmIdsCollection = HAUtil.getRMHAIds(conf); 244 rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]); 245 } 246 247 for (int i = 0; i < resourceManagers.length; i++) { 248 resourceManagers[i] = createResourceManager(); 249 if (!useFixedPorts) { 250 if (HAUtil.isHAEnabled(conf)) { 251 setHARMConfigurationWithEphemeralPorts(i, conf); 252 } else { 253 setNonHARMConfigurationWithEphemeralPorts(conf); 254 } 255 } 256 addService(new ResourceManagerWrapper(i)); 257 } 258 for(int index = 0; index < nodeManagers.length; index++) { 259 nodeManagers[index] = 260 useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager(); 261 addService(new NodeManagerWrapper(index)); 262 } 263 264 if(conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 265 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) || enableAHS) { 266 addService(new ApplicationHistoryServerWrapper()); 267 } 268 269 super.serviceInit( 270 conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); 271 } 272 setNonHARMConfigurationWithEphemeralPorts(Configuration conf)273 private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) { 274 String hostname = MiniYARNCluster.getHostname(); 275 conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); 276 conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); 277 conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); 278 conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0"); 279 WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0); 280 } 281 setHARMConfigurationWithEphemeralPorts(final int index, Configuration conf)282 private void setHARMConfigurationWithEphemeralPorts(final int index, Configuration conf) { 283 String hostname = MiniYARNCluster.getHostname(); 284 for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) { 285 conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0"); 286 } 287 } 288 initResourceManager(int index, Configuration conf)289 private synchronized void initResourceManager(int index, Configuration conf) { 290 if (HAUtil.isHAEnabled(conf)) { 291 conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]); 292 } 293 resourceManagers[index].init(conf); 294 resourceManagers[index].getRMContext().getDispatcher().register( 295 RMAppAttemptEventType.class, 296 new EventHandler<RMAppAttemptEvent>() { 297 public void handle(RMAppAttemptEvent event) { 298 if (event instanceof RMAppAttemptRegistrationEvent) { 299 appMasters.put(event.getApplicationAttemptId(), 300 event.getTimestamp()); 301 } else if (event instanceof RMAppAttemptUnregistrationEvent) { 302 appMasters.remove(event.getApplicationAttemptId()); 303 } 304 } 305 }); 306 } 307 startResourceManager(final int index)308 private synchronized void startResourceManager(final int index) { 309 try { 310 Thread rmThread = new Thread() { 311 public void run() { 312 resourceManagers[index].start(); 313 } 314 }; 315 rmThread.setName("RM-" + index); 316 rmThread.start(); 317 int waitCount = 0; 318 while (resourceManagers[index].getServiceState() == STATE.INITED 319 && waitCount++ < 60) { 320 LOG.info("Waiting for RM to start..."); 321 Thread.sleep(1500); 322 } 323 if (resourceManagers[index].getServiceState() != STATE.STARTED) { 324 // RM could have failed. 325 throw new IOException( 326 "ResourceManager failed to start. Final state is " 327 + resourceManagers[index].getServiceState()); 328 } 329 } catch (Throwable t) { 330 throw new YarnRuntimeException(t); 331 } 332 LOG.info("MiniYARN ResourceManager address: " + 333 getConfig().get(YarnConfiguration.RM_ADDRESS)); 334 LOG.info("MiniYARN ResourceManager web address: " + 335 WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); 336 } 337 338 @InterfaceAudience.Private 339 @VisibleForTesting stopResourceManager(int index)340 public synchronized void stopResourceManager(int index) { 341 if (resourceManagers[index] != null) { 342 resourceManagers[index].stop(); 343 resourceManagers[index] = null; 344 } 345 } 346 347 @InterfaceAudience.Private 348 @VisibleForTesting restartResourceManager(int index)349 public synchronized void restartResourceManager(int index) 350 throws InterruptedException { 351 if (resourceManagers[index] != null) { 352 resourceManagers[index].stop(); 353 resourceManagers[index] = null; 354 } 355 Configuration conf = getConfig(); 356 resourceManagers[index] = new ResourceManager(); 357 initResourceManager(index, getConfig()); 358 startResourceManager(index); 359 } 360 getTestWorkDir()361 public File getTestWorkDir() { 362 return testWorkDir; 363 } 364 365 /** 366 * In a HA cluster, go through all the RMs and find the Active RM. In a 367 * non-HA cluster, return the index of the only RM. 368 * 369 * @return index of the active RM or -1 if none of them turn active 370 */ 371 @InterfaceAudience.Private 372 @VisibleForTesting getActiveRMIndex()373 public int getActiveRMIndex() { 374 if (resourceManagers.length == 1) { 375 return 0; 376 } 377 378 int numRetriesForRMBecomingActive = failoverTimeout / 100; 379 while (numRetriesForRMBecomingActive-- > 0) { 380 for (int i = 0; i < resourceManagers.length; i++) { 381 if (resourceManagers[i] == null) { 382 continue; 383 } 384 try { 385 if (HAServiceProtocol.HAServiceState.ACTIVE == 386 resourceManagers[i].getRMContext().getRMAdminService() 387 .getServiceStatus().getState()) { 388 return i; 389 } 390 } catch (IOException e) { 391 throw new YarnRuntimeException("Couldn't read the status of " + 392 "a ResourceManger in the HA ensemble.", e); 393 } 394 } 395 try { 396 Thread.sleep(100); 397 } catch (InterruptedException e) { 398 throw new YarnRuntimeException("Interrupted while waiting for one " + 399 "of the ResourceManagers to become active"); 400 } 401 } 402 return -1; 403 } 404 405 /** 406 * @return the active {@link ResourceManager} of the cluster, 407 * null if none of them are active. 408 */ getResourceManager()409 public ResourceManager getResourceManager() { 410 int activeRMIndex = getActiveRMIndex(); 411 return activeRMIndex == -1 412 ? null 413 : this.resourceManagers[activeRMIndex]; 414 } 415 getResourceManager(int i)416 public ResourceManager getResourceManager(int i) { 417 return this.resourceManagers[i]; 418 } 419 getNodeManager(int i)420 public NodeManager getNodeManager(int i) { 421 return this.nodeManagers[i]; 422 } 423 getHostname()424 public static String getHostname() { 425 try { 426 return InetAddress.getLocalHost().getHostName(); 427 } 428 catch (UnknownHostException ex) { 429 throw new RuntimeException(ex); 430 } 431 } 432 433 private class ResourceManagerWrapper extends AbstractService { 434 private int index; 435 ResourceManagerWrapper(int i)436 public ResourceManagerWrapper(int i) { 437 super(ResourceManagerWrapper.class.getName() + "_" + i); 438 index = i; 439 } 440 441 @Override serviceInit(Configuration conf)442 protected synchronized void serviceInit(Configuration conf) 443 throws Exception { 444 initResourceManager(index, conf); 445 super.serviceInit(conf); 446 } 447 448 @Override serviceStart()449 protected synchronized void serviceStart() throws Exception { 450 startResourceManager(index); 451 LOG.info("MiniYARN ResourceManager address: " + 452 getConfig().get(YarnConfiguration.RM_ADDRESS)); 453 LOG.info("MiniYARN ResourceManager web address: " + 454 WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); 455 super.serviceStart(); 456 } 457 waitForAppMastersToFinish(long timeoutMillis)458 private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException { 459 long started = System.currentTimeMillis(); 460 synchronized (appMasters) { 461 while (!appMasters.isEmpty() && System.currentTimeMillis() - started < timeoutMillis) { 462 appMasters.wait(1000); 463 } 464 } 465 if (!appMasters.isEmpty()) { 466 LOG.warn("Stopping RM while some app masters are still alive"); 467 } 468 } 469 470 @Override serviceStop()471 protected synchronized void serviceStop() throws Exception { 472 if (resourceManagers[index] != null) { 473 waitForAppMastersToFinish(5000); 474 resourceManagers[index].stop(); 475 } 476 477 if (Shell.WINDOWS) { 478 // On Windows, clean up the short temporary symlink that was created to 479 // work around path length limitation. 480 String testWorkDirPath = testWorkDir.getAbsolutePath(); 481 try { 482 FileContext.getLocalFSFileContext().delete(new Path(testWorkDirPath), 483 true); 484 } catch (IOException e) { 485 LOG.warn("could not cleanup symlink: " + 486 testWorkDir.getAbsolutePath()); 487 } 488 } 489 super.serviceStop(); 490 } 491 } 492 493 private class NodeManagerWrapper extends AbstractService { 494 int index = 0; 495 NodeManagerWrapper(int i)496 public NodeManagerWrapper(int i) { 497 super(NodeManagerWrapper.class.getName() + "_" + i); 498 index = i; 499 } 500 serviceInit(Configuration conf)501 protected synchronized void serviceInit(Configuration conf) 502 throws Exception { 503 Configuration config = new YarnConfiguration(conf); 504 // create nm-local-dirs and configure them for the nodemanager 505 String localDirsString = prepareDirs("local", numLocalDirs); 506 config.set(YarnConfiguration.NM_LOCAL_DIRS, localDirsString); 507 // create nm-log-dirs and configure them for the nodemanager 508 String logDirsString = prepareDirs("log", numLogDirs); 509 config.set(YarnConfiguration.NM_LOG_DIRS, logDirsString); 510 511 config.setInt(YarnConfiguration.NM_PMEM_MB, config.getInt( 512 YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 513 YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB)); 514 515 config.set(YarnConfiguration.NM_ADDRESS, 516 MiniYARNCluster.getHostname() + ":0"); 517 config.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, 518 MiniYARNCluster.getHostname() + ":0"); 519 WebAppUtils 520 .setNMWebAppHostNameAndPort(config, 521 MiniYARNCluster.getHostname(), 0); 522 523 // Disable resource checks by default 524 if (!config.getBoolean( 525 YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, 526 YarnConfiguration. 527 DEFAULT_YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) { 528 config.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false); 529 config.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); 530 } 531 532 LOG.info("Starting NM: " + index); 533 nodeManagers[index].init(config); 534 super.serviceInit(config); 535 } 536 537 /** 538 * Create local/log directories 539 * @param dirType type of directories i.e. local dirs or log dirs 540 * @param numDirs number of directories 541 * @return the created directories as a comma delimited String 542 */ prepareDirs(String dirType, int numDirs)543 private String prepareDirs(String dirType, int numDirs) { 544 File []dirs = new File[numDirs]; 545 String dirsString = ""; 546 for (int i = 0; i < numDirs; i++) { 547 dirs[i]= new File(testWorkDir, MiniYARNCluster.this.getName() 548 + "-" + dirType + "Dir-nm-" + index + "_" + i); 549 dirs[i].mkdirs(); 550 LOG.info("Created " + dirType + "Dir in " + dirs[i].getAbsolutePath()); 551 String delimiter = (i > 0) ? "," : ""; 552 dirsString = dirsString.concat(delimiter + dirs[i].getAbsolutePath()); 553 } 554 return dirsString; 555 } 556 serviceStart()557 protected synchronized void serviceStart() throws Exception { 558 try { 559 new Thread() { 560 public void run() { 561 nodeManagers[index].start(); 562 } 563 }.start(); 564 int waitCount = 0; 565 while (nodeManagers[index].getServiceState() == STATE.INITED 566 && waitCount++ < 60) { 567 LOG.info("Waiting for NM " + index + " to start..."); 568 Thread.sleep(1000); 569 } 570 if (nodeManagers[index].getServiceState() != STATE.STARTED) { 571 // RM could have failed. 572 throw new IOException("NodeManager " + index + " failed to start"); 573 } 574 super.serviceStart(); 575 } catch (Throwable t) { 576 throw new YarnRuntimeException(t); 577 } 578 } 579 580 @Override serviceStop()581 protected synchronized void serviceStop() throws Exception { 582 if (nodeManagers[index] != null) { 583 nodeManagers[index].stop(); 584 } 585 super.serviceStop(); 586 } 587 } 588 589 private class CustomNodeManager extends NodeManager { 590 @Override doSecureLogin()591 protected void doSecureLogin() throws IOException { 592 // Don't try to login using keytab in the testcase. 593 } 594 } 595 596 private class ShortCircuitedNodeManager extends CustomNodeManager { 597 @Override createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker)598 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 599 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 600 return new NodeStatusUpdaterImpl(context, dispatcher, 601 healthChecker, metrics) { 602 @Override 603 protected ResourceTracker getRMClient() { 604 final ResourceTrackerService rt = 605 getResourceManager().getResourceTrackerService(); 606 final RecordFactory recordFactory = 607 RecordFactoryProvider.getRecordFactory(null); 608 609 // For in-process communication without RPC 610 return new ResourceTracker() { 611 612 @Override 613 public NodeHeartbeatResponse nodeHeartbeat( 614 NodeHeartbeatRequest request) throws YarnException, 615 IOException { 616 NodeHeartbeatResponse response; 617 try { 618 response = rt.nodeHeartbeat(request); 619 } catch (YarnException e) { 620 LOG.info("Exception in heartbeat from node " + 621 request.getNodeStatus().getNodeId(), e); 622 throw e; 623 } 624 return response; 625 } 626 627 @Override 628 public RegisterNodeManagerResponse registerNodeManager( 629 RegisterNodeManagerRequest request) 630 throws YarnException, IOException { 631 RegisterNodeManagerResponse response; 632 try { 633 response = rt.registerNodeManager(request); 634 } catch (YarnException e) { 635 LOG.info("Exception in node registration from " 636 + request.getNodeId().toString(), e); 637 throw e; 638 } 639 return response; 640 } 641 }; 642 } 643 644 @Override 645 protected void stopRMProxy() { } 646 }; 647 } 648 } 649 650 /** 651 * Wait for all the NodeManagers to connect to the ResourceManager. 652 * 653 * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds. 654 * @return true if all NodeManagers connect to the (Active) 655 * ResourceManager, false otherwise. 656 * @throws YarnException 657 * @throws InterruptedException 658 */ 659 public boolean waitForNodeManagersToConnect(long timeout) 660 throws YarnException, InterruptedException { 661 GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); 662 for (int i = 0; i < timeout / 100; i++) { 663 ResourceManager rm = getResourceManager(); 664 if (rm == null) { 665 throw new YarnException("Can not find the active RM."); 666 } 667 else if (nodeManagers.length == rm.getClientRMService() 668 .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { 669 return true; 670 } 671 Thread.sleep(100); 672 } 673 return false; 674 } 675 676 private class ApplicationHistoryServerWrapper extends AbstractService { 677 public ApplicationHistoryServerWrapper() { 678 super(ApplicationHistoryServerWrapper.class.getName()); 679 } 680 681 @Override 682 protected synchronized void serviceInit(Configuration conf) 683 throws Exception { 684 appHistoryServer = new ApplicationHistoryServer(); 685 conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE, 686 MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class); 687 conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, 688 MemoryTimelineStore.class, TimelineStore.class); 689 conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, 690 MemoryTimelineStateStore.class, TimelineStateStore.class); 691 if (!useFixedPorts) { 692 String hostname = MiniYARNCluster.getHostname(); 693 conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0"); 694 conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, hostname 695 + ":0"); 696 } 697 appHistoryServer.init(conf); 698 super.serviceInit(conf); 699 } 700 701 @Override 702 protected synchronized void serviceStart() throws Exception { 703 try { 704 new Thread() { 705 public void run() { 706 appHistoryServer.start(); 707 }; 708 }.start(); 709 int waitCount = 0; 710 while (appHistoryServer.getServiceState() == STATE.INITED 711 && waitCount++ < 60) { 712 LOG.info("Waiting for Timeline Server to start..."); 713 Thread.sleep(1500); 714 } 715 if (appHistoryServer.getServiceState() != STATE.STARTED) { 716 // AHS could have failed. 717 throw new IOException( 718 "ApplicationHistoryServer failed to start. Final state is " 719 + appHistoryServer.getServiceState()); 720 } 721 super.serviceStart(); 722 } catch (Throwable t) { 723 throw new YarnRuntimeException(t); 724 } 725 LOG.info("MiniYARN ApplicationHistoryServer address: " 726 + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS)); 727 LOG.info("MiniYARN ApplicationHistoryServer web address: " 728 + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS)); 729 } 730 731 @Override 732 protected synchronized void serviceStop() throws Exception { 733 if (appHistoryServer != null) { 734 appHistoryServer.stop(); 735 } 736 } 737 } 738 739 public ApplicationHistoryServer getApplicationHistoryServer() { 740 return this.appHistoryServer; 741 } 742 743 protected ResourceManager createResourceManager() { 744 return new ResourceManager(){ 745 @Override 746 protected void doSecureLogin() throws IOException { 747 // Don't try to login using keytab in the testcases. 748 } 749 }; 750 } 751 752 public int getNumOfResourceManager() { 753 return this.resourceManagers.length; 754 } 755 } 756