1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server;
20 
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.Collection;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.classification.InterfaceStability;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileContext;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.ha.HAServiceProtocol;
37 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
38 import org.apache.hadoop.service.AbstractService;
39 import org.apache.hadoop.service.CompositeService;
40 import org.apache.hadoop.util.Shell;
41 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
42 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
43 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
44 import org.apache.hadoop.yarn.conf.HAUtil;
45 import org.apache.hadoop.yarn.conf.YarnConfiguration;
46 import org.apache.hadoop.yarn.event.Dispatcher;
47 import org.apache.hadoop.yarn.event.EventHandler;
48 import org.apache.hadoop.yarn.exceptions.YarnException;
49 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
50 import org.apache.hadoop.yarn.factories.RecordFactory;
51 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
52 import org.apache.hadoop.yarn.server.api.ResourceTracker;
53 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
54 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
55 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
56 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
57 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
58 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
59 import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
60 import org.apache.hadoop.yarn.server.nodemanager.Context;
61 import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
62 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
63 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
64 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
65 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
66 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
67 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
68 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
69 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
70 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
71 import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
72 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
73 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
74 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
75 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
76 
77 import com.google.common.annotations.VisibleForTesting;
78 
79 /**
80  * Embedded Yarn minicluster for testcases that need to interact with a cluster.
81  * <p/>
82  * In a real cluster, resource request matching is done using the hostname, and
83  * by default Yarn minicluster works in the exact same way as a real cluster.
84  * <p/>
85  * If a testcase needs to use multiple nodes and exercise resource request
86  * matching to a specific node, then the property
87  * {@YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} should be set
88  * <code>true</code> in the configuration used to initialize the minicluster.
89  * <p/>
90  * With this property set to <code>true</code>, the matching will be done using
91  * the <code>hostname:port</code> of the namenodes. In such case, the AM must
92  * do resource request using <code>hostname:port</code> as the location.
93  */
94 @InterfaceAudience.Public
95 @InterfaceStability.Evolving
96 public class MiniYARNCluster extends CompositeService {
97 
98   private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);
99 
100   // temp fix until metrics system can auto-detect itself running in unit test:
101   static {
102     DefaultMetricsSystem.setMiniClusterMode(true);
103   }
104 
105   private NodeManager[] nodeManagers;
106   private ResourceManager[] resourceManagers;
107   private String[] rmIds;
108 
109   private ApplicationHistoryServer appHistoryServer;
110 
111   private boolean useFixedPorts;
112   private boolean useRpc = false;
113   private int failoverTimeout;
114 
115   private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
116       new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
117 
118   private File testWorkDir;
119 
120   // Number of nm-local-dirs per nodemanager
121   private int numLocalDirs;
122   // Number of nm-log-dirs per nodemanager
123   private int numLogDirs;
124   private boolean enableAHS;
125 
126   /**
127    * @param testName name of the test
128    * @param numResourceManagers the number of resource managers in the cluster
129    * @param numNodeManagers the number of node managers in the cluster
130    * @param numLocalDirs the number of nm-local-dirs per nodemanager
131    * @param numLogDirs the number of nm-log-dirs per nodemanager
132    * @param enableAHS enable ApplicationHistoryServer or not
133    */
MiniYARNCluster( String testName, int numResourceManagers, int numNodeManagers, int numLocalDirs, int numLogDirs, boolean enableAHS)134   public MiniYARNCluster(
135       String testName, int numResourceManagers, int numNodeManagers,
136       int numLocalDirs, int numLogDirs, boolean enableAHS) {
137     super(testName.replace("$", ""));
138     this.numLocalDirs = numLocalDirs;
139     this.numLogDirs = numLogDirs;
140     this.enableAHS = enableAHS;
141     String testSubDir = testName.replace("$", "");
142     File targetWorkDir = new File("target", testSubDir);
143     try {
144       FileContext.getLocalFSFileContext().delete(
145           new Path(targetWorkDir.getAbsolutePath()), true);
146     } catch (Exception e) {
147       LOG.warn("COULD NOT CLEANUP", e);
148       throw new YarnRuntimeException("could not cleanup test dir: "+ e, e);
149     }
150 
151     if (Shell.WINDOWS) {
152       // The test working directory can exceed the maximum path length supported
153       // by some Windows APIs and cmd.exe (260 characters).  To work around this,
154       // create a symlink in temporary storage with a much shorter path,
155       // targeting the full path to the test working directory.  Then, use the
156       // symlink as the test working directory.
157       String targetPath = targetWorkDir.getAbsolutePath();
158       File link = new File(System.getProperty("java.io.tmpdir"),
159         String.valueOf(System.currentTimeMillis()));
160       String linkPath = link.getAbsolutePath();
161 
162       try {
163         FileContext.getLocalFSFileContext().delete(new Path(linkPath), true);
164       } catch (IOException e) {
165         throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e);
166       }
167 
168       // Guarantee target exists before creating symlink.
169       targetWorkDir.mkdirs();
170 
171       ShellCommandExecutor shexec = new ShellCommandExecutor(
172         Shell.getSymlinkCommand(targetPath, linkPath));
173       try {
174         shexec.execute();
175       } catch (IOException e) {
176         throw new YarnRuntimeException(String.format(
177           "failed to create symlink from %s to %s, shell output: %s", linkPath,
178           targetPath, shexec.getOutput()), e);
179       }
180 
181       this.testWorkDir = link;
182     } else {
183       this.testWorkDir = targetWorkDir;
184     }
185 
186     resourceManagers = new ResourceManager[numResourceManagers];
187     nodeManagers = new NodeManager[numNodeManagers];
188   }
189 
190   /**
191    * @param testName name of the test
192    * @param numResourceManagers the number of resource managers in the cluster
193    * @param numNodeManagers the number of node managers in the cluster
194    * @param numLocalDirs the number of nm-local-dirs per nodemanager
195    * @param numLogDirs the number of nm-log-dirs per nodemanager
196    */
MiniYARNCluster( String testName, int numResourceManagers, int numNodeManagers, int numLocalDirs, int numLogDirs)197   public MiniYARNCluster(
198       String testName, int numResourceManagers, int numNodeManagers,
199       int numLocalDirs, int numLogDirs) {
200     this(testName, numResourceManagers, numNodeManagers, numLocalDirs,
201         numLogDirs, false);
202   }
203 
204   /**
205    * @param testName name of the test
206    * @param numNodeManagers the number of node managers in the cluster
207    * @param numLocalDirs the number of nm-local-dirs per nodemanager
208    * @param numLogDirs the number of nm-log-dirs per nodemanager
209    */
MiniYARNCluster(String testName, int numNodeManagers, int numLocalDirs, int numLogDirs)210   public MiniYARNCluster(String testName, int numNodeManagers,
211                          int numLocalDirs, int numLogDirs) {
212     this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
213   }
214 
215   @Override
serviceInit(Configuration conf)216   public void serviceInit(Configuration conf) throws Exception {
217     useFixedPorts = conf.getBoolean(
218         YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
219         YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
220     useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
221         YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
222     failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
223         YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
224 
225     if (useRpc && !useFixedPorts) {
226       throw new YarnRuntimeException("Invalid configuration!" +
227           " Minicluster can use rpc only when configured to use fixed ports");
228     }
229 
230     conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
231     if (resourceManagers.length > 1) {
232       conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
233       if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
234         StringBuilder rmIds = new StringBuilder();
235         for (int i = 0; i < resourceManagers.length; i++) {
236           if (i != 0) {
237             rmIds.append(",");
238           }
239           rmIds.append("rm" + i);
240         }
241         conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
242       }
243       Collection<String> rmIdsCollection = HAUtil.getRMHAIds(conf);
244       rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]);
245     }
246 
247     for (int i = 0; i < resourceManagers.length; i++) {
248       resourceManagers[i] = createResourceManager();
249       if (!useFixedPorts) {
250         if (HAUtil.isHAEnabled(conf)) {
251           setHARMConfigurationWithEphemeralPorts(i, conf);
252         } else {
253           setNonHARMConfigurationWithEphemeralPorts(conf);
254         }
255       }
256       addService(new ResourceManagerWrapper(i));
257     }
258     for(int index = 0; index < nodeManagers.length; index++) {
259       nodeManagers[index] =
260           useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager();
261       addService(new NodeManagerWrapper(index));
262     }
263 
264     if(conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
265         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) || enableAHS) {
266         addService(new ApplicationHistoryServerWrapper());
267     }
268 
269     super.serviceInit(
270         conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
271   }
272 
setNonHARMConfigurationWithEphemeralPorts(Configuration conf)273   private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) {
274     String hostname = MiniYARNCluster.getHostname();
275     conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
276     conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
277     conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
278     conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
279     WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
280   }
281 
setHARMConfigurationWithEphemeralPorts(final int index, Configuration conf)282   private void setHARMConfigurationWithEphemeralPorts(final int index, Configuration conf) {
283     String hostname = MiniYARNCluster.getHostname();
284     for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
285       conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0");
286     }
287   }
288 
initResourceManager(int index, Configuration conf)289   private synchronized void initResourceManager(int index, Configuration conf) {
290     if (HAUtil.isHAEnabled(conf)) {
291       conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
292     }
293     resourceManagers[index].init(conf);
294     resourceManagers[index].getRMContext().getDispatcher().register(
295         RMAppAttemptEventType.class,
296         new EventHandler<RMAppAttemptEvent>() {
297           public void handle(RMAppAttemptEvent event) {
298             if (event instanceof RMAppAttemptRegistrationEvent) {
299               appMasters.put(event.getApplicationAttemptId(),
300                   event.getTimestamp());
301             } else if (event instanceof RMAppAttemptUnregistrationEvent) {
302               appMasters.remove(event.getApplicationAttemptId());
303             }
304           }
305         });
306   }
307 
startResourceManager(final int index)308   private synchronized void startResourceManager(final int index) {
309     try {
310       Thread rmThread = new Thread() {
311         public void run() {
312           resourceManagers[index].start();
313         }
314       };
315       rmThread.setName("RM-" + index);
316       rmThread.start();
317       int waitCount = 0;
318       while (resourceManagers[index].getServiceState() == STATE.INITED
319           && waitCount++ < 60) {
320         LOG.info("Waiting for RM to start...");
321         Thread.sleep(1500);
322       }
323       if (resourceManagers[index].getServiceState() != STATE.STARTED) {
324         // RM could have failed.
325         throw new IOException(
326             "ResourceManager failed to start. Final state is "
327                 + resourceManagers[index].getServiceState());
328       }
329     } catch (Throwable t) {
330       throw new YarnRuntimeException(t);
331     }
332     LOG.info("MiniYARN ResourceManager address: " +
333         getConfig().get(YarnConfiguration.RM_ADDRESS));
334     LOG.info("MiniYARN ResourceManager web address: " +
335         WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
336   }
337 
338   @InterfaceAudience.Private
339   @VisibleForTesting
stopResourceManager(int index)340   public synchronized void stopResourceManager(int index) {
341     if (resourceManagers[index] != null) {
342       resourceManagers[index].stop();
343       resourceManagers[index] = null;
344     }
345   }
346 
347   @InterfaceAudience.Private
348   @VisibleForTesting
restartResourceManager(int index)349   public synchronized void restartResourceManager(int index)
350       throws InterruptedException {
351     if (resourceManagers[index] != null) {
352       resourceManagers[index].stop();
353       resourceManagers[index] = null;
354     }
355     Configuration conf = getConfig();
356     resourceManagers[index] = new ResourceManager();
357     initResourceManager(index, getConfig());
358     startResourceManager(index);
359   }
360 
getTestWorkDir()361   public File getTestWorkDir() {
362     return testWorkDir;
363   }
364 
365   /**
366    * In a HA cluster, go through all the RMs and find the Active RM. In a
367    * non-HA cluster, return the index of the only RM.
368    *
369    * @return index of the active RM or -1 if none of them turn active
370    */
371   @InterfaceAudience.Private
372   @VisibleForTesting
getActiveRMIndex()373   public int getActiveRMIndex() {
374     if (resourceManagers.length == 1) {
375       return 0;
376     }
377 
378     int numRetriesForRMBecomingActive = failoverTimeout / 100;
379     while (numRetriesForRMBecomingActive-- > 0) {
380       for (int i = 0; i < resourceManagers.length; i++) {
381         if (resourceManagers[i] == null) {
382           continue;
383         }
384         try {
385           if (HAServiceProtocol.HAServiceState.ACTIVE ==
386               resourceManagers[i].getRMContext().getRMAdminService()
387                   .getServiceStatus().getState()) {
388             return i;
389           }
390         } catch (IOException e) {
391           throw new YarnRuntimeException("Couldn't read the status of " +
392               "a ResourceManger in the HA ensemble.", e);
393         }
394       }
395       try {
396         Thread.sleep(100);
397       } catch (InterruptedException e) {
398         throw new YarnRuntimeException("Interrupted while waiting for one " +
399             "of the ResourceManagers to become active");
400       }
401     }
402     return -1;
403   }
404 
405   /**
406    * @return the active {@link ResourceManager} of the cluster,
407    * null if none of them are active.
408    */
getResourceManager()409   public ResourceManager getResourceManager() {
410     int activeRMIndex = getActiveRMIndex();
411     return activeRMIndex == -1
412         ? null
413         : this.resourceManagers[activeRMIndex];
414   }
415 
getResourceManager(int i)416   public ResourceManager getResourceManager(int i) {
417     return this.resourceManagers[i];
418   }
419 
getNodeManager(int i)420   public NodeManager getNodeManager(int i) {
421     return this.nodeManagers[i];
422   }
423 
getHostname()424   public static String getHostname() {
425     try {
426       return InetAddress.getLocalHost().getHostName();
427     }
428     catch (UnknownHostException ex) {
429       throw new RuntimeException(ex);
430     }
431   }
432 
433   private class ResourceManagerWrapper extends AbstractService {
434     private int index;
435 
ResourceManagerWrapper(int i)436     public ResourceManagerWrapper(int i) {
437       super(ResourceManagerWrapper.class.getName() + "_" + i);
438       index = i;
439     }
440 
441     @Override
serviceInit(Configuration conf)442     protected synchronized void serviceInit(Configuration conf)
443         throws Exception {
444       initResourceManager(index, conf);
445       super.serviceInit(conf);
446     }
447 
448     @Override
serviceStart()449     protected synchronized void serviceStart() throws Exception {
450       startResourceManager(index);
451       LOG.info("MiniYARN ResourceManager address: " +
452                getConfig().get(YarnConfiguration.RM_ADDRESS));
453       LOG.info("MiniYARN ResourceManager web address: " +
454                WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
455       super.serviceStart();
456     }
457 
waitForAppMastersToFinish(long timeoutMillis)458     private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException {
459       long started = System.currentTimeMillis();
460       synchronized (appMasters) {
461         while (!appMasters.isEmpty() && System.currentTimeMillis() - started < timeoutMillis) {
462           appMasters.wait(1000);
463         }
464       }
465       if (!appMasters.isEmpty()) {
466         LOG.warn("Stopping RM while some app masters are still alive");
467       }
468     }
469 
470     @Override
serviceStop()471     protected synchronized void serviceStop() throws Exception {
472       if (resourceManagers[index] != null) {
473         waitForAppMastersToFinish(5000);
474         resourceManagers[index].stop();
475       }
476 
477       if (Shell.WINDOWS) {
478         // On Windows, clean up the short temporary symlink that was created to
479         // work around path length limitation.
480         String testWorkDirPath = testWorkDir.getAbsolutePath();
481         try {
482           FileContext.getLocalFSFileContext().delete(new Path(testWorkDirPath),
483             true);
484         } catch (IOException e) {
485           LOG.warn("could not cleanup symlink: " +
486             testWorkDir.getAbsolutePath());
487         }
488       }
489       super.serviceStop();
490     }
491   }
492 
493   private class NodeManagerWrapper extends AbstractService {
494     int index = 0;
495 
NodeManagerWrapper(int i)496     public NodeManagerWrapper(int i) {
497       super(NodeManagerWrapper.class.getName() + "_" + i);
498       index = i;
499     }
500 
serviceInit(Configuration conf)501     protected synchronized void serviceInit(Configuration conf)
502         throws Exception {
503       Configuration config = new YarnConfiguration(conf);
504       // create nm-local-dirs and configure them for the nodemanager
505       String localDirsString = prepareDirs("local", numLocalDirs);
506       config.set(YarnConfiguration.NM_LOCAL_DIRS, localDirsString);
507       // create nm-log-dirs and configure them for the nodemanager
508       String logDirsString = prepareDirs("log", numLogDirs);
509       config.set(YarnConfiguration.NM_LOG_DIRS, logDirsString);
510 
511       config.setInt(YarnConfiguration.NM_PMEM_MB, config.getInt(
512           YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB,
513           YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB));
514 
515       config.set(YarnConfiguration.NM_ADDRESS,
516           MiniYARNCluster.getHostname() + ":0");
517       config.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
518           MiniYARNCluster.getHostname() + ":0");
519       WebAppUtils
520           .setNMWebAppHostNameAndPort(config,
521               MiniYARNCluster.getHostname(), 0);
522 
523       // Disable resource checks by default
524       if (!config.getBoolean(
525           YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
526           YarnConfiguration.
527               DEFAULT_YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
528         config.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
529         config.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
530       }
531 
532       LOG.info("Starting NM: " + index);
533       nodeManagers[index].init(config);
534       super.serviceInit(config);
535     }
536 
537     /**
538      * Create local/log directories
539      * @param dirType type of directories i.e. local dirs or log dirs
540      * @param numDirs number of directories
541      * @return the created directories as a comma delimited String
542      */
prepareDirs(String dirType, int numDirs)543     private String prepareDirs(String dirType, int numDirs) {
544       File []dirs = new File[numDirs];
545       String dirsString = "";
546       for (int i = 0; i < numDirs; i++) {
547         dirs[i]= new File(testWorkDir, MiniYARNCluster.this.getName()
548             + "-" + dirType + "Dir-nm-" + index + "_" + i);
549         dirs[i].mkdirs();
550         LOG.info("Created " + dirType + "Dir in " + dirs[i].getAbsolutePath());
551         String delimiter = (i > 0) ? "," : "";
552         dirsString = dirsString.concat(delimiter + dirs[i].getAbsolutePath());
553       }
554       return dirsString;
555     }
556 
serviceStart()557     protected synchronized void serviceStart() throws Exception {
558       try {
559         new Thread() {
560           public void run() {
561             nodeManagers[index].start();
562           }
563         }.start();
564         int waitCount = 0;
565         while (nodeManagers[index].getServiceState() == STATE.INITED
566             && waitCount++ < 60) {
567           LOG.info("Waiting for NM " + index + " to start...");
568           Thread.sleep(1000);
569         }
570         if (nodeManagers[index].getServiceState() != STATE.STARTED) {
571           // RM could have failed.
572           throw new IOException("NodeManager " + index + " failed to start");
573         }
574         super.serviceStart();
575       } catch (Throwable t) {
576         throw new YarnRuntimeException(t);
577       }
578     }
579 
580     @Override
serviceStop()581     protected synchronized void serviceStop() throws Exception {
582       if (nodeManagers[index] != null) {
583         nodeManagers[index].stop();
584       }
585       super.serviceStop();
586     }
587   }
588 
589   private class CustomNodeManager extends NodeManager {
590     @Override
doSecureLogin()591     protected void doSecureLogin() throws IOException {
592       // Don't try to login using keytab in the testcase.
593     }
594   }
595 
596   private class ShortCircuitedNodeManager extends CustomNodeManager {
597     @Override
createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker)598     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
599         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
600       return new NodeStatusUpdaterImpl(context, dispatcher,
601           healthChecker, metrics) {
602         @Override
603         protected ResourceTracker getRMClient() {
604           final ResourceTrackerService rt =
605               getResourceManager().getResourceTrackerService();
606           final RecordFactory recordFactory =
607             RecordFactoryProvider.getRecordFactory(null);
608 
609           // For in-process communication without RPC
610           return new ResourceTracker() {
611 
612             @Override
613             public NodeHeartbeatResponse nodeHeartbeat(
614                 NodeHeartbeatRequest request) throws YarnException,
615                 IOException {
616               NodeHeartbeatResponse response;
617               try {
618                 response = rt.nodeHeartbeat(request);
619               } catch (YarnException e) {
620                 LOG.info("Exception in heartbeat from node " +
621                     request.getNodeStatus().getNodeId(), e);
622                 throw e;
623               }
624               return response;
625             }
626 
627             @Override
628             public RegisterNodeManagerResponse registerNodeManager(
629                 RegisterNodeManagerRequest request)
630                 throws YarnException, IOException {
631               RegisterNodeManagerResponse response;
632               try {
633                 response = rt.registerNodeManager(request);
634               } catch (YarnException e) {
635                 LOG.info("Exception in node registration from "
636                     + request.getNodeId().toString(), e);
637                 throw e;
638               }
639               return response;
640             }
641           };
642         }
643 
644         @Override
645         protected void stopRMProxy() { }
646       };
647     }
648   }
649 
650   /**
651    * Wait for all the NodeManagers to connect to the ResourceManager.
652    *
653    * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
654    * @return true if all NodeManagers connect to the (Active)
655    * ResourceManager, false otherwise.
656    * @throws YarnException
657    * @throws InterruptedException
658    */
659   public boolean waitForNodeManagersToConnect(long timeout)
660       throws YarnException, InterruptedException {
661     GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
662     for (int i = 0; i < timeout / 100; i++) {
663       ResourceManager rm = getResourceManager();
664       if (rm == null) {
665         throw new YarnException("Can not find the active RM.");
666       }
667       else if (nodeManagers.length == rm.getClientRMService()
668             .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
669         return true;
670       }
671       Thread.sleep(100);
672     }
673     return false;
674   }
675 
676   private class ApplicationHistoryServerWrapper extends AbstractService {
677     public ApplicationHistoryServerWrapper() {
678       super(ApplicationHistoryServerWrapper.class.getName());
679     }
680 
681     @Override
682     protected synchronized void serviceInit(Configuration conf)
683         throws Exception {
684       appHistoryServer = new ApplicationHistoryServer();
685       conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
686           MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
687       conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
688           MemoryTimelineStore.class, TimelineStore.class);
689       conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
690           MemoryTimelineStateStore.class, TimelineStateStore.class);
691       if (!useFixedPorts) {
692         String hostname = MiniYARNCluster.getHostname();
693         conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
694         conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, hostname
695             + ":0");
696       }
697       appHistoryServer.init(conf);
698       super.serviceInit(conf);
699     }
700 
701     @Override
702     protected synchronized void serviceStart() throws Exception {
703       try {
704         new Thread() {
705           public void run() {
706             appHistoryServer.start();
707           };
708         }.start();
709         int waitCount = 0;
710         while (appHistoryServer.getServiceState() == STATE.INITED
711             && waitCount++ < 60) {
712           LOG.info("Waiting for Timeline Server to start...");
713           Thread.sleep(1500);
714         }
715         if (appHistoryServer.getServiceState() != STATE.STARTED) {
716           // AHS could have failed.
717           throw new IOException(
718               "ApplicationHistoryServer failed to start. Final state is "
719                   + appHistoryServer.getServiceState());
720         }
721         super.serviceStart();
722       } catch (Throwable t) {
723         throw new YarnRuntimeException(t);
724       }
725       LOG.info("MiniYARN ApplicationHistoryServer address: "
726           + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS));
727       LOG.info("MiniYARN ApplicationHistoryServer web address: "
728           + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS));
729     }
730 
731     @Override
732     protected synchronized void serviceStop() throws Exception {
733       if (appHistoryServer != null) {
734         appHistoryServer.stop();
735       }
736     }
737   }
738 
739   public ApplicationHistoryServer getApplicationHistoryServer() {
740     return this.appHistoryServer;
741   }
742 
743   protected ResourceManager createResourceManager() {
744     return new ResourceManager(){
745       @Override
746       protected void doSecureLogin() throws IOException {
747         // Don't try to login using keytab in the testcases.
748       }
749     };
750   }
751 
752   public int getNumOfResourceManager() {
753     return this.resourceManagers.length;
754   }
755 }
756